[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif/adapter_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif/adapter_proxy_if.go
deleted file mode 100644
index c514d6d..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif/adapter_proxy_if.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package adapterif
-
-import (
-	"context"
-
-	"github.com/golang/protobuf/proto"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-)
-
-// AdapterProxy interface for AdapterProxy implementation.
-type AdapterProxy interface {
-	SendInterAdapterMessage(ctx context.Context,
-		msg proto.Message,
-		msgType ic.InterAdapterMessageType_Types,
-		fromAdapter string,
-		toAdapter string,
-		toDeviceID string,
-		proxyDeviceID string,
-		messageID string) error
-	TechProfileInstanceRequest(ctx context.Context,
-		tpPath string,
-		ponIntfID uint32,
-		onuID uint32,
-		uniID uint32,
-		fromAdapter string,
-		toAdapter string,
-		toDeviceID string,
-		proxyDeviceID string) (*ic.InterAdapterTechProfileDownloadMessage, error)
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif/core_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif/core_proxy_if.go
deleted file mode 100644
index 36939bd..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif/core_proxy_if.go
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package adapterif
-
-import (
-	"context"
-
-	"github.com/opencord/voltha-protos/v4/go/voltha"
-)
-
-// CoreProxy interface for voltha-go coreproxy.
-type CoreProxy interface {
-	UpdateCoreReference(deviceID string, coreReference string)
-	DeleteCoreReference(deviceID string)
-	RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
-	DeviceUpdate(ctx context.Context, device *voltha.Device) error
-	PortCreated(ctx context.Context, deviceID string, port *voltha.Port) error
-	PortsStateUpdate(ctx context.Context, deviceID string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error
-	DeleteAllPorts(ctx context.Context, deviceID string) error
-	GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error)
-	ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error)
-	DeviceStateUpdate(ctx context.Context, deviceID string,
-		connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error
-
-	DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error
-	ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int,
-		childDeviceType string, channelID int, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error)
-
-	ChildDevicesLost(ctx context.Context, parentDeviceID string) error
-	ChildDevicesDetected(ctx context.Context, parentDeviceID string) error
-	GetDevice(ctx context.Context, parentDeviceID string, deviceID string) (*voltha.Device, error)
-	GetChildDevice(ctx context.Context, parentDeviceID string, kwargs map[string]interface{}) (*voltha.Device, error)
-	GetChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error)
-	SendPacketIn(ctx context.Context, deviceID string, port uint32, pktPayload []byte) error
-	DeviceReasonUpdate(ctx context.Context, deviceID string, deviceReason string) error
-	PortStateUpdate(ctx context.Context, deviceID string, pType voltha.Port_PortType, portNo uint32,
-		operStatus voltha.OperStatus_Types) error
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/adapter_proxy.go
deleted file mode 100644
index fc31041..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/adapter_proxy.go
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package common
-
-import (
-	"context"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db"
-	"google.golang.org/grpc/status"
-
-	"github.com/golang/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/golang/protobuf/ptypes/any"
-	"github.com/google/uuid"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-)
-
-type AdapterProxy struct {
-	kafkaICProxy kafka.InterContainerProxy
-	coreTopic    string
-	endpointMgr  kafka.EndpointManager
-}
-
-func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
-	proxy := AdapterProxy{
-		kafkaICProxy: kafkaProxy,
-		coreTopic:    coreTopic,
-		endpointMgr:  kafka.NewEndpointManager(backend),
-	}
-	logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
-	return &proxy
-}
-
-func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
-	msg proto.Message,
-	msgType ic.InterAdapterMessageType_Types,
-	fromAdapter string,
-	toAdapter string,
-	toDeviceId string,
-	proxyDeviceId string,
-	messageId string) error {
-	logger.Debugw(ctx, "sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
-		"to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
-
-	//Marshal the message
-	var marshalledMsg *any.Any
-	var err error
-	if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
-		logger.Warnw(ctx, "cannot-marshal-msg", log.Fields{"error": err})
-		return err
-	}
-
-	// Set up the required rpc arguments
-	endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
-	if err != nil {
-		return err
-	}
-
-	//Build the inter adapter message
-	header := &ic.InterAdapterHeader{
-		Type:          msgType,
-		FromTopic:     fromAdapter,
-		ToTopic:       string(endpoint),
-		ToDeviceId:    toDeviceId,
-		ProxyDeviceId: proxyDeviceId,
-	}
-	if messageId != "" {
-		header.Id = messageId
-	} else {
-		header.Id = uuid.New().String()
-	}
-	header.Timestamp = ptypes.TimestampNow()
-	iaMsg := &ic.InterAdapterMessage{
-		Header: header,
-		Body:   marshalledMsg,
-	}
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "msg",
-		Value: iaMsg,
-	}
-
-	topic := kafka.Topic{Name: string(endpoint)}
-	replyToTopic := kafka.Topic{Name: fromAdapter}
-	rpc := "process_inter_adapter_message"
-
-	// Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
-	ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
-	logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
-	return unPackResponse(ctx, rpc, "", success, result)
-}
-
-func (ap *AdapterProxy) TechProfileInstanceRequest(ctx context.Context,
-	tpPath string,
-	parentPonPort uint32,
-	onuID uint32,
-	uniID uint32,
-	fromAdapter string,
-	toAdapter string,
-	toDeviceId string,
-	proxyDeviceId string) (*ic.InterAdapterTechProfileDownloadMessage, error) {
-	logger.Debugw(ctx, "sending-tech-profile-instance-request-message", log.Fields{"from": fromAdapter,
-		"to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
-
-	// Set up the required rpc arguments
-	endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
-	if err != nil {
-		return nil, err
-	}
-
-	//Build the inter adapter message
-	tpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{
-		TpInstancePath: tpPath,
-		ParentDeviceId: toDeviceId,
-		ParentPonPort:  parentPonPort,
-		OnuId:          onuID,
-		UniId:          uniID,
-	}
-
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "msg",
-		Value: tpReqMsg,
-	}
-
-	topic := kafka.Topic{Name: string(endpoint)}
-	replyToTopic := kafka.Topic{Name: fromAdapter}
-	rpc := "process_tech_profile_instance_request"
-
-	ctx = context.WithValue(ctx, "inter-adapter-tp-req-msg", tpPath)
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
-	logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
-	if success {
-		tpDwnldMsg := &ic.InterAdapterTechProfileDownloadMessage{}
-		if err := ptypes.UnmarshalAny(result, tpDwnldMsg); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, err
-		}
-		return tpDwnldMsg, nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "TechProfileInstanceRequest-return", log.Fields{"tpPath": tpPath, "success": success, "error": err})
-
-		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
-	}
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/common.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/common.go
deleted file mode 100644
index 98085bb..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/common.go
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package common
-
-import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-)
-
-var logger log.CLogger
-
-func init() {
-	// Setup this package so that it's log level can be modified at run time
-	var err error
-	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
-	if err != nil {
-		panic(err)
-	}
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/core_proxy.go
deleted file mode 100644
index 589d951..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/core_proxy.go
+++ /dev/null
@@ -1,689 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package common
-
-import (
-	"context"
-	"sync"
-
-	"github.com/golang/protobuf/ptypes"
-	a "github.com/golang/protobuf/ptypes/any"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-)
-
-type CoreProxy struct {
-	kafkaICProxy        kafka.InterContainerProxy
-	adapterTopic        string
-	coreTopic           string
-	deviceIdCoreMap     map[string]string
-	lockDeviceIdCoreMap sync.RWMutex
-}
-
-func NewCoreProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
-	var proxy CoreProxy
-	proxy.kafkaICProxy = kafkaProxy
-	proxy.adapterTopic = adapterTopic
-	proxy.coreTopic = coreTopic
-	proxy.deviceIdCoreMap = make(map[string]string)
-	proxy.lockDeviceIdCoreMap = sync.RWMutex{}
-	logger.Debugw(ctx, "TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
-
-	return &proxy
-}
-
-func unPackResponse(ctx context.Context, rpc string, deviceId string, success bool, response *a.Any) error {
-	if success {
-		return nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "response", log.Fields{"rpc": rpc, "device-id": deviceId, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
-	}
-}
-
-// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
-func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
-	ap.lockDeviceIdCoreMap.Lock()
-	defer ap.lockDeviceIdCoreMap.Unlock()
-	ap.deviceIdCoreMap[deviceId] = coreReference
-}
-
-// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
-func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
-	ap.lockDeviceIdCoreMap.Lock()
-	defer ap.lockDeviceIdCoreMap.Unlock()
-	delete(ap.deviceIdCoreMap, deviceId)
-}
-
-func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
-	ap.lockDeviceIdCoreMap.Lock()
-	defer ap.lockDeviceIdCoreMap.Unlock()
-
-	if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
-		return kafka.Topic{Name: t}
-	}
-
-	return kafka.Topic{Name: ap.coreTopic}
-}
-
-func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
-	return kafka.Topic{Name: ap.adapterTopic}
-}
-
-func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
-	logger.Debugw(ctx, "registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
-	rpc := "Register"
-	topic := kafka.Topic{Name: ap.coreTopic}
-	replyToTopic := ap.getAdapterTopic()
-	args := make([]*kafka.KVArg, 2)
-
-	if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
-		logger.Fatal(ctx, "totalReplicas can't be 0, since you're here you have at least one")
-	}
-
-	if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
-		logger.Fatal(ctx, "currentReplica can't be 0, it has to start from 1")
-	}
-
-	if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
-		// if the adapter is not setting these fields they default to 0,
-		// in that case it means the adapter is not ready to be scaled and thus it defaults
-		// to a single instance
-		adapter.CurrentReplica = 1
-		adapter.TotalReplicas = 1
-	}
-
-	if adapter.CurrentReplica > adapter.TotalReplicas {
-		logger.Fatalf(ctx, "CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
-			adapter.CurrentReplica, adapter.TotalReplicas)
-	}
-
-	args[0] = &kafka.KVArg{
-		Key:   "adapter",
-		Value: adapter,
-	}
-	args[1] = &kafka.KVArg{
-		Key:   "deviceTypes",
-		Value: deviceTypes,
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
-	logger.Debugw(ctx, "Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
-	return unPackResponse(ctx, rpc, "", success, result)
-}
-
-func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw(ctx, "DeviceUpdate", log.Fields{"device-id": device.Id})
-	rpc := "DeviceUpdate"
-	toTopic := ap.getCoreTopic(device.Id)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
-	}
-	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	logger.Debugw(ctx, "DeviceUpdate-response", log.Fields{"device-id": device.Id, "success": success})
-	return unPackResponse(ctx, rpc, device.Id, success, result)
-}
-
-func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
-	logger.Debugw(ctx, "PortCreated", log.Fields{"portNo": port.PortNo})
-	rpc := "PortCreated"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(deviceId)
-	args := make([]*kafka.KVArg, 2)
-	id := &voltha.ID{Id: deviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-	args[1] = &kafka.KVArg{
-		Key:   "port",
-		Value: port,
-	}
-
-	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	logger.Debugw(ctx, "PortCreated-response", log.Fields{"device-id": deviceId, "success": success})
-	return unPackResponse(ctx, rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"device-id": deviceId})
-	rpc := "PortsStateUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(deviceId)
-	args := []*kafka.KVArg{{
-		Key:   "device_id",
-		Value: &voltha.ID{Id: deviceId},
-	}, {
-		Key:   "port_type_filter",
-		Value: &ic.IntType{Val: int64(portTypeFilter)},
-	}, {
-		Key:   "oper_status",
-		Value: &ic.IntType{Val: int64(operStatus)},
-	}}
-
-	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	logger.Debugw(ctx, "PortsStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
-	return unPackResponse(ctx, rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
-	logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"device-id": deviceId})
-	rpc := "DeleteAllPorts"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(deviceId)
-	args := make([]*kafka.KVArg, 2)
-	id := &voltha.ID{Id: deviceId}
-
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-
-	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	logger.Debugw(ctx, "DeleteAllPorts-response", log.Fields{"device-id": deviceId, "success": success})
-	return unPackResponse(ctx, rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error) {
-	logger.Debugw(ctx, "GetDevicePort", log.Fields{"device-id": deviceID})
-	rpc := "GetDevicePort"
-
-	toTopic := ap.getCoreTopic(deviceID)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := []*kafka.KVArg{{
-		Key:   "device_id",
-		Value: &voltha.ID{Id: deviceID},
-	}, {
-		Key:   "port_no",
-		Value: &ic.IntType{Val: int64(portNo)},
-	}}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
-	logger.Debugw(ctx, "GetDevicePort-response", log.Fields{"device-id": deviceID, "success": success})
-
-	if success {
-		port := &voltha.Port{}
-		if err := ptypes.UnmarshalAny(result, port); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Error(codes.InvalidArgument, err.Error())
-		}
-		return port, nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "GetDevicePort-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
-	}
-}
-
-func (ap *CoreProxy) ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error) {
-	logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
-	rpc := "ListDevicePorts"
-
-	toTopic := ap.getCoreTopic(deviceID)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := []*kafka.KVArg{{
-		Key:   "device_id",
-		Value: &voltha.ID{Id: deviceID},
-	}}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
-	logger.Debugw(ctx, "ListDevicePorts-response", log.Fields{"device-id": deviceID, "success": success})
-
-	if success {
-		ports := &voltha.Ports{}
-		if err := ptypes.UnmarshalAny(result, ports); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Error(codes.InvalidArgument, err.Error())
-		}
-		return ports.Items, nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "ListDevicePorts-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
-	}
-}
-
-func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
-	connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"device-id": deviceId})
-	rpc := "DeviceStateUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(deviceId)
-	args := make([]*kafka.KVArg, 3)
-	id := &voltha.ID{Id: deviceId}
-	oStatus := &ic.IntType{Val: int64(operStatus)}
-	cStatus := &ic.IntType{Val: int64(connStatus)}
-
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-	args[1] = &kafka.KVArg{
-		Key:   "oper_status",
-		Value: oStatus,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "connect_status",
-		Value: cStatus,
-	}
-	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	logger.Debugw(ctx, "DeviceStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
-	return unPackResponse(ctx, rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
-	childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
-	logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"parent-device-id": parentDeviceId, "channelId": channelId})
-	rpc := "ChildDeviceDetected"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(parentDeviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 7)
-	id := &voltha.ID{Id: parentDeviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "parent_device_id",
-		Value: id,
-	}
-	ppn := &ic.IntType{Val: int64(parentPortNo)}
-	args[1] = &kafka.KVArg{
-		Key:   "parent_port_no",
-		Value: ppn,
-	}
-	cdt := &ic.StrType{Val: childDeviceType}
-	args[2] = &kafka.KVArg{
-		Key:   "child_device_type",
-		Value: cdt,
-	}
-	channel := &ic.IntType{Val: int64(channelId)}
-	args[3] = &kafka.KVArg{
-		Key:   "channel_id",
-		Value: channel,
-	}
-	vId := &ic.StrType{Val: vendorId}
-	args[4] = &kafka.KVArg{
-		Key:   "vendor_id",
-		Value: vId,
-	}
-	sNo := &ic.StrType{Val: serialNumber}
-	args[5] = &kafka.KVArg{
-		Key:   "serial_number",
-		Value: sNo,
-	}
-	oId := &ic.IntType{Val: int64(onuId)}
-	args[6] = &kafka.KVArg{
-		Key:   "onu_id",
-		Value: oId,
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
-	logger.Debugw(ctx, "ChildDeviceDetected-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
-
-	if success {
-		volthaDevice := &voltha.Device{}
-		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Error(codes.InvalidArgument, err.Error())
-		}
-		return volthaDevice, nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "ChildDeviceDetected-return", log.Fields{"device-id": parentDeviceId, "success": success, "error": err})
-
-		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
-	}
-
-}
-
-func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
-	logger.Debugw(ctx, "ChildDevicesLost", log.Fields{"parent-device-id": parentDeviceId})
-	rpc := "ChildDevicesLost"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(parentDeviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 1)
-	id := &voltha.ID{Id: parentDeviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "parent_device_id",
-		Value: id,
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
-	logger.Debugw(ctx, "ChildDevicesLost-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
-	return unPackResponse(ctx, rpc, parentDeviceId, success, result)
-}
-
-func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
-	logger.Debugw(ctx, "ChildDevicesDetected", log.Fields{"parent-device-id": parentDeviceId})
-	rpc := "ChildDevicesDetected"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(parentDeviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 1)
-	id := &voltha.ID{Id: parentDeviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "parent_device_id",
-		Value: id,
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
-	logger.Debugw(ctx, "ChildDevicesDetected-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
-	return unPackResponse(ctx, rpc, parentDeviceId, success, result)
-}
-
-func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
-	logger.Debugw(ctx, "GetDevice", log.Fields{"device-id": deviceId})
-	rpc := "GetDevice"
-
-	toTopic := ap.getCoreTopic(parentDeviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 1)
-	id := &voltha.ID{Id: deviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
-	logger.Debugw(ctx, "GetDevice-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
-
-	if success {
-		volthaDevice := &voltha.Device{}
-		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Error(codes.InvalidArgument, err.Error())
-		}
-		return volthaDevice, nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "GetDevice-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
-	}
-}
-
-func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
-	logger.Debugw(ctx, "GetChildDevice", log.Fields{"parent-device-id": parentDeviceId, "kwargs": kwargs})
-	rpc := "GetChildDevice"
-
-	toTopic := ap.getCoreTopic(parentDeviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 4)
-	id := &voltha.ID{Id: parentDeviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-
-	var cnt uint8 = 0
-	for k, v := range kwargs {
-		cnt += 1
-		if k == "serial_number" {
-			val := &ic.StrType{Val: v.(string)}
-			args[cnt] = &kafka.KVArg{
-				Key:   k,
-				Value: val,
-			}
-		} else if k == "onu_id" {
-			val := &ic.IntType{Val: int64(v.(uint32))}
-			args[cnt] = &kafka.KVArg{
-				Key:   k,
-				Value: val,
-			}
-		} else if k == "parent_port_no" {
-			val := &ic.IntType{Val: int64(v.(uint32))}
-			args[cnt] = &kafka.KVArg{
-				Key:   k,
-				Value: val,
-			}
-		}
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
-	logger.Debugw(ctx, "GetChildDevice-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
-
-	if success {
-		volthaDevice := &voltha.Device{}
-		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Error(codes.InvalidArgument, err.Error())
-		}
-		return volthaDevice, nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "GetChildDevice-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
-
-		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
-	}
-}
-
-func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
-	logger.Debugw(ctx, "GetChildDevices", log.Fields{"parent-device-id": parentDeviceId})
-	rpc := "GetChildDevices"
-
-	toTopic := ap.getCoreTopic(parentDeviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 1)
-	id := &voltha.ID{Id: parentDeviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
-	logger.Debugw(ctx, "GetChildDevices-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
-
-	if success {
-		volthaDevices := &voltha.Devices{}
-		if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Error(codes.InvalidArgument, err.Error())
-		}
-		return volthaDevices, nil
-	} else {
-		unpackResult := &ic.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		logger.Debugw(ctx, "GetChildDevices-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
-
-		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
-	}
-}
-
-func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
-	logger.Debugw(ctx, "SendPacketIn", log.Fields{"device-id": deviceId, "port": port, "pktPayload": pktPayload})
-	rpc := "PacketIn"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(deviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 3)
-	id := &voltha.ID{Id: deviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-	portNo := &ic.IntType{Val: int64(port)}
-	args[1] = &kafka.KVArg{
-		Key:   "port",
-		Value: portNo,
-	}
-	pkt := &ic.Packet{Payload: pktPayload}
-	args[2] = &kafka.KVArg{
-		Key:   "packet",
-		Value: pkt,
-	}
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	logger.Debugw(ctx, "SendPacketIn-response", log.Fields{"device-id": deviceId, "success": success})
-	return unPackResponse(ctx, rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
-	logger.Debugw(ctx, "DeviceReasonUpdate", log.Fields{"device-id": deviceId, "deviceReason": deviceReason})
-	rpc := "DeviceReasonUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(deviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 2)
-	id := &voltha.ID{Id: deviceId}
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: id,
-	}
-	reason := &ic.StrType{Val: deviceReason}
-	args[1] = &kafka.KVArg{
-		Key:   "device_reason",
-		Value: reason,
-	}
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	logger.Debugw(ctx, "DeviceReason-response", log.Fields{"device-id": deviceId, "success": success})
-	return unPackResponse(ctx, rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
-	logger.Debugw(ctx, "DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
-	rpc := "DevicePMConfigUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(pmConfigs.Id)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device_pm_config",
-		Value: pmConfigs,
-	}
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
-	logger.Debugw(ctx, "DevicePMConfigUpdate-response", log.Fields{"pmconfig-device-id": pmConfigs.Id, "success": success})
-	return unPackResponse(ctx, rpc, pmConfigs.Id, success, result)
-}
-
-func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
-	logger.Debugw(ctx, "ReconcileChildDevices", log.Fields{"parent-device-id": parentDeviceId})
-	rpc := "ReconcileChildDevices"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(parentDeviceId)
-	replyToTopic := ap.getAdapterTopic()
-
-	args := []*kafka.KVArg{
-		{Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
-	}
-
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
-	logger.Debugw(ctx, "ReconcileChildDevices-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
-	return unPackResponse(ctx, rpc, parentDeviceId, success, result)
-}
-
-func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
-	operStatus voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "PortStateUpdate", log.Fields{"device-id": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
-	rpc := "PortStateUpdate"
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	toTopic := ap.getCoreTopic(deviceId)
-	args := make([]*kafka.KVArg, 4)
-	deviceID := &voltha.ID{Id: deviceId}
-	portNo := &ic.IntType{Val: int64(portNum)}
-	portType := &ic.IntType{Val: int64(pType)}
-	oStatus := &ic.IntType{Val: int64(operStatus)}
-
-	args[0] = &kafka.KVArg{
-		Key:   "device_id",
-		Value: deviceID,
-	}
-	args[1] = &kafka.KVArg{
-		Key:   "oper_status",
-		Value: oStatus,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "port_type",
-		Value: portType,
-	}
-	args[3] = &kafka.KVArg{
-		Key:   "port_no",
-		Value: portNo,
-	}
-
-	// Use a device specific topic as we are the only adaptercore handling requests for this device
-	replyToTopic := ap.getAdapterTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
-	logger.Debugw(ctx, "PortStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
-	return unPackResponse(ctx, rpc, deviceId, success, result)
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/request_handler.go
deleted file mode 100644
index 90f575b..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/request_handler.go
+++ /dev/null
@@ -1,1011 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package common
-
-import (
-	"context"
-	"errors"
-
-	"github.com/golang/protobuf/ptypes"
-	"github.com/golang/protobuf/ptypes/empty"
-	"github.com/opencord/voltha-lib-go/v5/pkg/adapters"
-	"github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/extension"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-)
-
-type RequestHandlerProxy struct {
-	TestMode       bool
-	coreInstanceId string
-	adapter        adapters.IAdapter
-	coreProxy      adapterif.CoreProxy
-}
-
-func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter, cProxy adapterif.CoreProxy) *RequestHandlerProxy {
-	var proxy RequestHandlerProxy
-	proxy.coreInstanceId = coreInstanceId
-	proxy.adapter = iadapter
-	proxy.coreProxy = cProxy
-	return &proxy
-}
-
-func (rhp *RequestHandlerProxy) Adapter_descriptor() (*empty.Empty, error) {
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Device_types() (*voltha.DeviceTypes, error) {
-	return nil, nil
-}
-
-func (rhp *RequestHandlerProxy) Health() (*voltha.HealthStatus, error) {
-	return nil, nil
-}
-
-func (rhp *RequestHandlerProxy) Adopt_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-
-	logger.Debugw(ctx, "Adopt_device", log.Fields{"deviceId": device.Id})
-
-	//Update the core reference for that device
-	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
-
-	//Invoke the adopt device on the adapter
-	if err := rhp.adapter.Adopt_device(ctx, device); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Reconcile_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	//Update the core reference for that device
-	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
-
-	//Invoke the reconcile device API on the adapter
-	if err := rhp.adapter.Reconcile_device(ctx, device); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Abandon_device(args []*ic.Argument) (*empty.Empty, error) {
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Disable_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	//Update the core reference for that device
-	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
-	//Invoke the Disable_device API on the adapter
-	if err := rhp.adapter.Disable_device(ctx, device); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Reenable_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	//Update the core reference for that device
-	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
-	//Invoke the Reenable_device API on the adapter
-	if err := rhp.adapter.Reenable_device(ctx, device); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Reboot_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	//Update the core reference for that device
-	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
-	//Invoke the Reboot_device API on the adapter
-	if err := rhp.adapter.Reboot_device(ctx, device); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-
-}
-
-func (rhp *RequestHandlerProxy) Self_test_device(args []*ic.Argument) (*empty.Empty, error) {
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Delete_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	//Update the core reference for that device
-	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
-	//Invoke the delete_device API on the adapter
-	if err := rhp.adapter.Delete_device(ctx, device); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Get_device_details(args []*ic.Argument) (*empty.Empty, error) {
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Update_flows_bulk(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	logger.Debug(ctx, "Update_flows_bulk")
-	if len(args) < 5 {
-		logger.Warn(ctx, "Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	flows := &voltha.Flows{}
-	flowMetadata := &voltha.FlowMetadata{}
-	groups := &voltha.FlowGroups{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case "flows":
-			if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-flows", log.Fields{"error": err})
-				return nil, err
-			}
-		case "groups":
-			if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-groups", log.Fields{"error": err})
-				return nil, err
-			}
-		case "flow_metadata":
-			if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-metadata", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	logger.Debugw(ctx, "Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
-	//Invoke the bulk flow update API of the adapter
-	if err := rhp.adapter.Update_flows_bulk(ctx, device, flows, groups, flowMetadata); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Update_flows_incrementally(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	logger.Debug(ctx, "Update_flows_incrementally")
-	if len(args) < 5 {
-		logger.Warn(ctx, "Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	flows := &openflow_13.FlowChanges{}
-	flowMetadata := &voltha.FlowMetadata{}
-	groups := &openflow_13.FlowGroupChanges{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case "flow_changes":
-			if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-flows", log.Fields{"error": err})
-				return nil, err
-			}
-		case "group_changes":
-			if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-groups", log.Fields{"error": err})
-				return nil, err
-			}
-		case "flow_metadata":
-			if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-metadata", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	logger.Debugw(ctx, "Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
-	//Invoke the incremental flow update API of the adapter
-	if err := rhp.adapter.Update_flows_incrementally(ctx, device, flows, groups, flowMetadata); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Update_pm_config(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	logger.Debug(ctx, "Update_pm_config")
-	if len(args) < 2 {
-		logger.Warn(ctx, "Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	pmConfigs := &voltha.PmConfigs{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case "pm_configs":
-			if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-pm-configs", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	logger.Debugw(ctx, "Update_pm_config", log.Fields{"device-id": device.Id, "pmConfigs": pmConfigs})
-	//Invoke the pm config update API of the adapter
-	if err := rhp.adapter.Update_pm_config(ctx, device, pmConfigs); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Receive_packet_out(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	logger.Debugw(ctx, "Receive_packet_out", log.Fields{"args": args})
-	if len(args) < 3 {
-		logger.Warn(ctx, "Receive_packet_out-invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	deviceId := &ic.StrType{}
-	egressPort := &ic.IntType{}
-	packet := &openflow_13.OfpPacketOut{}
-	transactionID := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "deviceId":
-			if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
-				return nil, err
-			}
-		case "outPort":
-			if err := ptypes.UnmarshalAny(arg.Value, egressPort); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-egressPort", log.Fields{"error": err})
-				return nil, err
-			}
-		case "packet":
-			if err := ptypes.UnmarshalAny(arg.Value, packet); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-packet", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	logger.Debugw(ctx, "Receive_packet_out", log.Fields{"device-id": deviceId.Val, "outPort": egressPort, "packet": packet})
-	//Invoke the adopt device on the adapter
-	if err := rhp.adapter.Receive_packet_out(ctx, deviceId.Val, int(egressPort.Val), packet); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Suppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Unsuppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Get_ofp_device_info(ctx context.Context, args []*ic.Argument) (*ic.SwitchCapability, error) {
-	if len(args) < 2 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	device := &voltha.Device{}
-	transactionID := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-
-	logger.Debugw(ctx, "Get_ofp_device_info", log.Fields{"device-id": device.Id})
-
-	var cap *ic.SwitchCapability
-	var err error
-	if cap, err = rhp.adapter.Get_ofp_device_info(ctx, device); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	logger.Debugw(ctx, "Get_ofp_device_info", log.Fields{"cap": cap})
-	return cap, nil
-}
-
-func (rhp *RequestHandlerProxy) Process_inter_adapter_message(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 2 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	iaMsg := &ic.InterAdapterMessage{}
-	transactionID := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "msg":
-			if err := ptypes.UnmarshalAny(arg.Value, iaMsg); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-
-	logger.Debugw(ctx, "Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
-
-	//Invoke the inter adapter API on the handler
-	if err := rhp.adapter.Process_inter_adapter_message(ctx, iaMsg); err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-
-	return new(empty.Empty), nil
-}
-
-func (rhp *RequestHandlerProxy) Process_tech_profile_instance_request(ctx context.Context, args []*ic.Argument) (*ic.InterAdapterTechProfileDownloadMessage, error) {
-	if len(args) < 2 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-	iaTpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{}
-	transactionID := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "msg":
-			if err := ptypes.UnmarshalAny(arg.Value, iaTpReqMsg); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-
-	logger.Debugw(ctx, "Process_tech_profile_instance_request", log.Fields{"tpPath": iaTpReqMsg.TpInstancePath})
-
-	//Invoke the tech profile instance request
-	tpInst := rhp.adapter.Process_tech_profile_instance_request(ctx, iaTpReqMsg)
-
-	return tpInst, nil
-}
-
-func (rhp *RequestHandlerProxy) Download_image(ctx context.Context, args []*ic.Argument) (*voltha.ImageDownload, error) {
-	device, image, err := unMarshalImageDowload(args, ctx)
-	if err != nil {
-		return nil, err
-	}
-
-	imageDownload, err := rhp.adapter.Download_image(ctx, device, image)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageDownload, nil
-}
-
-func (rhp *RequestHandlerProxy) Get_image_download_status(ctx context.Context, args []*ic.Argument) (*voltha.ImageDownload, error) {
-	device, image, err := unMarshalImageDowload(args, ctx)
-	if err != nil {
-		return nil, err
-	}
-
-	imageDownload, err := rhp.adapter.Get_image_download_status(ctx, device, image)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageDownload, nil
-}
-
-func (rhp *RequestHandlerProxy) Cancel_image_download(ctx context.Context, args []*ic.Argument) (*voltha.ImageDownload, error) {
-	device, image, err := unMarshalImageDowload(args, ctx)
-	if err != nil {
-		return nil, err
-	}
-
-	imageDownload, err := rhp.adapter.Cancel_image_download(ctx, device, image)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageDownload, nil
-}
-
-func (rhp *RequestHandlerProxy) Activate_image_update(ctx context.Context, args []*ic.Argument) (*voltha.ImageDownload, error) {
-
-	device, image, err := unMarshalImageDowload(args, ctx)
-	if err != nil {
-		return nil, err
-	}
-
-	imageDownload, err := rhp.adapter.Activate_image_update(ctx, device, image)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageDownload, nil
-}
-
-func (rhp *RequestHandlerProxy) Revert_image_update(ctx context.Context, args []*ic.Argument) (*voltha.ImageDownload, error) {
-	device, image, err := unMarshalImageDowload(args, ctx)
-	if err != nil {
-		return nil, err
-	}
-
-	imageDownload, err := rhp.adapter.Revert_image_update(ctx, device, image)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageDownload, nil
-}
-
-func unMarshalImageDowload(args []*ic.Argument, ctx context.Context) (*voltha.Device, *voltha.ImageDownload, error) {
-	if len(args) < 4 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, nil, err
-	}
-	device := &voltha.Device{}
-	image := &voltha.ImageDownload{}
-	transactionID := &ic.StrType{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, nil, err
-			}
-		case "request":
-			if err := ptypes.UnmarshalAny(arg.Value, image); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-image", log.Fields{"error": err})
-				return nil, nil, err
-			}
-		case kafka.TransactionKey:
-			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
-				return nil, nil, err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return nil, nil, err
-			}
-		}
-	}
-	return device, image, nil
-}
-
-func (rhp *RequestHandlerProxy) Enable_port(ctx context.Context, args []*ic.Argument) error {
-	logger.Debugw(ctx, "enable_port", log.Fields{"args": args})
-	deviceId, port, err := rhp.getEnableDisableParams(ctx, args)
-	if err != nil {
-		logger.Warnw(ctx, "enable_port", log.Fields{"args": args, "device-id": deviceId, "port": port})
-		return err
-	}
-	return rhp.adapter.Enable_port(ctx, deviceId, port)
-}
-
-func (rhp *RequestHandlerProxy) Disable_port(ctx context.Context, args []*ic.Argument) error {
-	logger.Debugw(ctx, "disable_port", log.Fields{"args": args})
-	deviceId, port, err := rhp.getEnableDisableParams(ctx, args)
-	if err != nil {
-		logger.Warnw(ctx, "disable_port", log.Fields{"args": args, "device-id": deviceId, "port": port})
-		return err
-	}
-	return rhp.adapter.Disable_port(ctx, deviceId, port)
-}
-
-func (rhp *RequestHandlerProxy) getEnableDisableParams(ctx context.Context, args []*ic.Argument) (string, *voltha.Port, error) {
-	logger.Debugw(ctx, "getEnableDisableParams", log.Fields{"args": args})
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		return "", nil, errors.New("invalid-number-of-args")
-	}
-	deviceId := &ic.StrType{}
-	port := &voltha.Port{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "deviceId":
-			if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return "", nil, err
-			}
-		case "port":
-			if err := ptypes.UnmarshalAny(arg.Value, port); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-port", log.Fields{"error": err})
-				return "", nil, err
-			}
-		}
-	}
-	return deviceId.Val, port, nil
-}
-
-func (rhp *RequestHandlerProxy) Child_device_lost(ctx context.Context, args []*ic.Argument) error {
-	if len(args) < 2 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		return errors.New("invalid-number-of-args")
-	}
-	childDevice := &voltha.Device{}
-	fromTopic := &ic.StrType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "childDevice":
-			if err := ptypes.UnmarshalAny(arg.Value, childDevice); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-child-device", log.Fields{"error": err})
-				return err
-			}
-		case kafka.FromTopic:
-			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
-				return err
-			}
-		}
-	}
-	//Update the core reference for that device
-	rhp.coreProxy.UpdateCoreReference(childDevice.ParentId, fromTopic.Val)
-	//Invoke the Child_device_lost API on the adapter
-	if err := rhp.adapter.Child_device_lost(ctx, childDevice); err != nil {
-		return status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return nil
-}
-
-func (rhp *RequestHandlerProxy) Start_omci_test(ctx context.Context, args []*ic.Argument) (*ic.TestResponse, error) {
-	if len(args) < 2 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		err := errors.New("invalid-number-of-args")
-		return nil, err
-	}
-
-	// TODO: See related comment in voltha-go:adapter_proxy_go:startOmciTest()
-	//   Second argument should perhaps be uuid instead of omcitestrequest
-
-	device := &voltha.Device{}
-	request := &voltha.OmciTestRequest{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case "omcitestrequest":
-			if err := ptypes.UnmarshalAny(arg.Value, request); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-omcitestrequest", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	logger.Debugw(ctx, "Start_omci_test", log.Fields{"device-id": device.Id, "req": request})
-	result, err := rhp.adapter.Start_omci_test(ctx, device, request)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return result, nil
-}
-func (rhp *RequestHandlerProxy) Get_ext_value(ctx context.Context, args []*ic.Argument) (*voltha.ReturnValues, error) {
-	if len(args) < 3 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		return nil, errors.New("invalid-number-of-args")
-	}
-
-	pDeviceId := &ic.StrType{}
-	device := &voltha.Device{}
-	valuetype := &ic.IntType{}
-	for _, arg := range args {
-		switch arg.Key {
-		case "device":
-			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		case "pDeviceId":
-			if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-parent-device-id", log.Fields{"error": err})
-				return nil, err
-			}
-		case "valuetype":
-			if err := ptypes.UnmarshalAny(arg.Value, valuetype); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-valuetype", log.Fields{"error": err})
-				return nil, err
-			}
-		default:
-			logger.Warnw(ctx, "key-not-found", log.Fields{"arg.Key": arg.Key})
-		}
-	}
-
-	//Invoke the Get_value API on the adapter
-	return rhp.adapter.Get_ext_value(ctx, pDeviceId.Val, device, voltha.ValueType_Type(valuetype.Val))
-}
-
-func (rhp *RequestHandlerProxy) Single_get_value_request(ctx context.Context, args []*ic.Argument) (*extension.SingleGetValueResponse, error) {
-	logger.Debugw(ctx, "req handler Single_get_value_request", log.Fields{"no of args": len(args), "args": args})
-
-	if len(args) < 1 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		return nil, errors.New("invalid-number-of-args")
-	}
-	singleGetvalueReq := extension.SingleGetValueRequest{}
-	errResp := func(status extension.GetValueResponse_Status, reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
-		return &extension.SingleGetValueResponse{
-			Response: &extension.GetValueResponse{
-				Status:    status,
-				ErrReason: reason,
-			},
-		}
-	}
-	for _, arg := range args {
-		switch arg.Key {
-		case "request":
-			if err := ptypes.UnmarshalAny(arg.Value, &singleGetvalueReq); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-singleGetvalueReq", log.Fields{"error": err})
-				return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_REASON_UNDEFINED), nil
-			}
-		default:
-			logger.Warnw(ctx, "key-not-found", log.Fields{"arg.Key": arg.Key})
-		}
-	}
-	logger.Debugw(ctx, "invoke rhp.adapter.Single_get_value_request ", log.Fields{"singleGetvalueReq": singleGetvalueReq})
-	return rhp.adapter.Single_get_value_request(ctx, singleGetvalueReq)
-}
-
-func (rhp *RequestHandlerProxy) getDeviceDownloadImageRequest(ctx context.Context, args []*ic.Argument) (*voltha.DeviceImageDownloadRequest, error) {
-	logger.Debugw(ctx, "getDeviceDownloadImageRequest", log.Fields{"args": args})
-	if len(args) < 1 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		return nil, errors.New("invalid-number-of-args")
-	}
-
-	deviceDownloadImageReq := ic.DeviceImageDownloadRequest{}
-
-	for _, arg := range args {
-		switch arg.Key {
-		case "deviceImageDownloadReq":
-			if err := ptypes.UnmarshalAny(arg.Value, &deviceDownloadImageReq); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	return &deviceDownloadImageReq, nil
-}
-
-func (rhp *RequestHandlerProxy) getDeviceImageRequest(ctx context.Context, args []*ic.Argument) (*voltha.DeviceImageRequest, error) {
-	logger.Debugw(ctx, "getDeviceImageRequest", log.Fields{"args": args})
-	if len(args) < 1 {
-		logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
-		return nil, errors.New("invalid-number-of-args")
-	}
-
-	deviceImageReq := ic.DeviceImageRequest{}
-
-	for _, arg := range args {
-		switch arg.Key {
-		case "deviceImageReq":
-			if err := ptypes.UnmarshalAny(arg.Value, &deviceImageReq); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return nil, err
-			}
-		}
-	}
-	return &deviceImageReq, nil
-}
-
-func (rhp *RequestHandlerProxy) getDeviceID(ctx context.Context, args []*ic.Argument) (string, error) {
-	logger.Debugw(ctx, "getDeviceID", log.Fields{"args": args})
-
-	deviceId := &ic.StrType{}
-
-	for _, arg := range args {
-		switch arg.Key {
-		case "deviceId":
-			if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
-				logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
-				return "", err
-			}
-		}
-	}
-
-	if deviceId.Val == "" {
-		return "", errors.New("invalid argument")
-	}
-
-	return deviceId.Val, nil
-}
-
-func (rhp *RequestHandlerProxy) Download_onu_image(ctx context.Context, args []*ic.Argument) (*voltha.DeviceImageResponse, error) {
-	imageDownloadReq, err := rhp.getDeviceDownloadImageRequest(ctx, args)
-	if err != nil {
-		return nil, err
-	}
-
-	imageDownloadRsp, err := rhp.adapter.Download_onu_image(ctx, imageDownloadReq)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageDownloadRsp, nil
-}
-
-func (rhp *RequestHandlerProxy) Get_onu_image_status(ctx context.Context, args []*ic.Argument) (*voltha.DeviceImageResponse, error) {
-	imageStatusReq, err := rhp.getDeviceImageRequest(ctx, args)
-	if err != nil {
-		return nil, err
-	}
-
-	imageStatus, err := rhp.adapter.Get_onu_image_status(ctx, imageStatusReq)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageStatus, nil
-}
-
-func (rhp *RequestHandlerProxy) Abort_onu_image_upgrade(ctx context.Context, args []*ic.Argument) (*voltha.DeviceImageResponse, error) {
-	imageAbortReq, err := rhp.getDeviceImageRequest(ctx, args)
-	if err != nil {
-		return nil, err
-	}
-
-	imageAbortRsp, err := rhp.adapter.Abort_onu_image_upgrade(ctx, imageAbortReq)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageAbortRsp, nil
-}
-
-func (rhp *RequestHandlerProxy) Activate_onu_image(ctx context.Context, args []*ic.Argument) (*voltha.DeviceImageResponse, error) {
-	imageActivateReq, err := rhp.getDeviceImageRequest(ctx, args)
-	if err != nil {
-		return nil, err
-	}
-
-	imageActivateRsp, err := rhp.adapter.Activate_onu_image(ctx, imageActivateReq)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return imageActivateRsp, nil
-}
-
-func (rhp *RequestHandlerProxy) Commit_onu_image(ctx context.Context, args []*ic.Argument) (*voltha.DeviceImageResponse, error) {
-	imageCommitReq, err := rhp.getDeviceImageRequest(ctx, args)
-	if err != nil {
-		return nil, err
-	}
-
-	imageCommitRsp, err := rhp.adapter.Commit_onu_image(ctx, imageCommitReq)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-
-	return imageCommitRsp, nil
-}
-
-func (rhp *RequestHandlerProxy) Get_onu_images(ctx context.Context, args []*ic.Argument) (*voltha.OnuImages, error) {
-	deviceID, err := rhp.getDeviceID(ctx, args)
-	if err != nil {
-		return nil, err
-	}
-
-	onuImages, err := rhp.adapter.Get_onu_images(ctx, deviceID)
-	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
-	}
-	return onuImages, nil
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/iAdapter.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/iAdapter.go
deleted file mode 100644
index aca4271..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/iAdapter.go
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package adapters
-
-import (
-	"context"
-
-	"github.com/opencord/voltha-protos/v4/go/extension"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
-)
-
-//IAdapter represents the set of APIs a voltha adapter has to support.
-type IAdapter interface {
-	Adapter_descriptor(ctx context.Context) error
-	Device_types(ctx context.Context) (*voltha.DeviceTypes, error)
-	Health(ctx context.Context) (*voltha.HealthStatus, error)
-	Adopt_device(ctx context.Context, device *voltha.Device) error
-	Reconcile_device(ctx context.Context, device *voltha.Device) error
-	Abandon_device(ctx context.Context, device *voltha.Device) error
-	Disable_device(ctx context.Context, device *voltha.Device) error
-	Reenable_device(ctx context.Context, device *voltha.Device) error
-	Reboot_device(ctx context.Context, device *voltha.Device) error
-	Self_test_device(ctx context.Context, device *voltha.Device) error
-	Delete_device(ctx context.Context, device *voltha.Device) error
-	Get_device_details(ctx context.Context, device *voltha.Device) error
-	Update_flows_bulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error
-	Update_flows_incrementally(ctx context.Context, device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error
-	Update_pm_config(ctx context.Context, device *voltha.Device, pm_configs *voltha.PmConfigs) error
-	Receive_packet_out(ctx context.Context, deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error
-	Suppress_event(ctx context.Context, filter *voltha.EventFilter) error
-	Unsuppress_event(ctx context.Context, filter *voltha.EventFilter) error
-	Get_ofp_device_info(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error)
-	Process_inter_adapter_message(ctx context.Context, msg *ic.InterAdapterMessage) error
-	Process_tech_profile_instance_request(ctx context.Context, msg *ic.InterAdapterTechProfileInstanceRequestMessage) *ic.InterAdapterTechProfileDownloadMessage
-	Download_image(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
-	Get_image_download_status(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
-	Cancel_image_download(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
-	Activate_image_update(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
-	Revert_image_update(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
-	Enable_port(ctx context.Context, deviceId string, port *voltha.Port) error
-	Disable_port(ctx context.Context, deviceId string, port *voltha.Port) error
-	Child_device_lost(ctx context.Context, childDevice *voltha.Device) error
-	Start_omci_test(ctx context.Context, device *voltha.Device, request *voltha.OmciTestRequest) (*voltha.TestResponse, error)
-	Get_ext_value(ctx context.Context, deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error)
-	Single_get_value_request(ctx context.Context, request extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error)
-	Download_onu_image(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error)
-	Get_onu_image_status(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error)
-	Abort_onu_image_upgrade(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error)
-	Get_onu_images(ctx context.Context, deviceID string) (*voltha.OnuImages, error)
-	Activate_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error)
-	Commit_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error)
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/endpoint_manager.go
deleted file mode 100644
index 962b932..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/endpoint_manager.go
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
-	"context"
-	"fmt"
-	"github.com/buraksezer/consistent"
-	"github.com/cespare/xxhash"
-	"github.com/golang/protobuf/proto"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-	"sync"
-)
-
-const (
-	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
-	// supporting 1000-10000 devices and 1 - 20 replicas of a service
-
-	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
-	DefaultPartitionCount = 1117
-
-	// Represents how many times a node is replicated on the consistent ring.
-	DefaultReplicationFactor = 117
-
-	// Load is used to calculate average load.
-	DefaultLoad = 1.1
-)
-
-type Endpoint string // Endpoint of a service instance.  When using kafka, this is the topic name of a service
-type ReplicaID int32 // The replication ID of a service instance
-
-type EndpointManager interface {
-
-	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
-	// now this will return the topic name
-	GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error)
-
-	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
-	// devices owned by that service need to be reconciled
-	IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error)
-
-	// GetReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
-	// test only
-	GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error)
-}
-
-type service struct {
-	id             string // Id of the service.  The same id is used for all replicas
-	totalReplicas  int32
-	replicas       map[ReplicaID]Endpoint
-	consistentRing *consistent.Consistent
-}
-
-type endpointManager struct {
-	partitionCount           int
-	replicationFactor        int
-	load                     float64
-	backend                  *db.Backend
-	services                 map[string]*service
-	servicesLock             sync.RWMutex
-	deviceTypeServiceMap     map[string]string
-	deviceTypeServiceMapLock sync.RWMutex
-}
-
-type EndpointManagerOption func(*endpointManager)
-
-func PartitionCount(count int) EndpointManagerOption {
-	return func(args *endpointManager) {
-		args.partitionCount = count
-	}
-}
-
-func ReplicationFactor(replicas int) EndpointManagerOption {
-	return func(args *endpointManager) {
-		args.replicationFactor = replicas
-	}
-}
-
-func Load(load float64) EndpointManagerOption {
-	return func(args *endpointManager) {
-		args.load = load
-	}
-}
-
-func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
-	tm := &endpointManager{
-		partitionCount:       DefaultPartitionCount,
-		replicationFactor:    DefaultReplicationFactor,
-		load:                 DefaultLoad,
-		backend:              backend,
-		services:             make(map[string]*service),
-		deviceTypeServiceMap: make(map[string]string),
-	}
-
-	for _, option := range opts {
-		option(tm)
-	}
-	return tm
-}
-
-func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
-	return newEndpointManager(backend, opts...)
-}
-
-func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error) {
-	logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
-	owner, err := ep.getOwner(ctx, deviceID, serviceType)
-	if err != nil {
-		return "", err
-	}
-	m, ok := owner.(Member)
-	if !ok {
-		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
-	}
-	endpoint := m.getEndPoint()
-	if endpoint == "" {
-		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
-	}
-	logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
-	return endpoint, nil
-}
-
-func (ep *endpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
-	logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
-	owner, err := ep.getOwner(ctx, deviceID, serviceType)
-	if err != nil {
-		return false, nil
-	}
-	m, ok := owner.(Member)
-	if !ok {
-		return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
-	}
-	return m.getReplica() == ReplicaID(replicaNumber), nil
-}
-
-func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error) {
-	owner, err := ep.getOwner(ctx, deviceID, serviceType)
-	if err != nil {
-		return 0, nil
-	}
-	m, ok := owner.(Member)
-	if !ok {
-		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
-	}
-	return m.getReplica(), nil
-}
-
-func (ep *endpointManager) getOwner(ctx context.Context, deviceID string, serviceType string) (consistent.Member, error) {
-	serv, dType, err := ep.getServiceAndDeviceType(ctx, serviceType)
-	if err != nil {
-		return nil, err
-	}
-	key := ep.makeKey(deviceID, dType, serviceType)
-	return serv.consistentRing.LocateKey(key), nil
-}
-
-func (ep *endpointManager) getServiceAndDeviceType(ctx context.Context, serviceType string) (*service, string, error) {
-	// Check whether service exist
-	ep.servicesLock.RLock()
-	serv, serviceExist := ep.services[serviceType]
-	ep.servicesLock.RUnlock()
-
-	// Load the service and device types if needed
-	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
-		if err := ep.loadServices(ctx); err != nil {
-			return nil, "", err
-		}
-
-		// Check whether the service exists now
-		ep.servicesLock.RLock()
-		serv, serviceExist = ep.services[serviceType]
-		ep.servicesLock.RUnlock()
-		if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
-			return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
-		}
-	}
-
-	ep.deviceTypeServiceMapLock.RLock()
-	defer ep.deviceTypeServiceMapLock.RUnlock()
-	for dType, sType := range ep.deviceTypeServiceMap {
-		if sType == serviceType {
-			return serv, dType, nil
-		}
-	}
-	return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
-}
-
-func (ep *endpointManager) getConsistentConfig() consistent.Config {
-	return consistent.Config{
-		PartitionCount:    ep.partitionCount,
-		ReplicationFactor: ep.replicationFactor,
-		Load:              ep.load,
-		Hasher:            hasher{},
-	}
-}
-
-// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
-// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
-// instead of watching for updates in the dB and acting on it.
-func (ep *endpointManager) loadServices(ctx context.Context) error {
-	ep.servicesLock.Lock()
-	defer ep.servicesLock.Unlock()
-	ep.deviceTypeServiceMapLock.Lock()
-	defer ep.deviceTypeServiceMapLock.Unlock()
-
-	if ep.backend == nil {
-		return status.Error(codes.Aborted, "backend-not-set")
-	}
-	ep.services = make(map[string]*service)
-	ep.deviceTypeServiceMap = make(map[string]string)
-
-	// Load the adapters
-	blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
-	if err != nil {
-		return err
-	}
-
-	// Data is marshalled as proto bytes in the data store
-	for _, blob := range blobs {
-		data := blob.Value.([]byte)
-		adapter := &voltha.Adapter{}
-		if err := proto.Unmarshal(data, adapter); err != nil {
-			return err
-		}
-		// A valid adapter should have the vendorID set
-		if adapter.Vendor != "" {
-			if _, ok := ep.services[adapter.Type]; !ok {
-				ep.services[adapter.Type] = &service{
-					id:             adapter.Type,
-					totalReplicas:  adapter.TotalReplicas,
-					replicas:       make(map[ReplicaID]Endpoint),
-					consistentRing: consistent.New(nil, ep.getConsistentConfig()),
-				}
-
-			}
-			currentReplica := ReplicaID(adapter.CurrentReplica)
-			endpoint := Endpoint(adapter.Endpoint)
-			ep.services[adapter.Type].replicas[currentReplica] = endpoint
-			ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
-		}
-	}
-	// Load the device types
-	blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
-	if err != nil {
-		return err
-	}
-	for _, blob := range blobs {
-		data := blob.Value.([]byte)
-		deviceType := &voltha.DeviceType{}
-		if err := proto.Unmarshal(data, deviceType); err != nil {
-			return err
-		}
-		if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
-			ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
-		}
-	}
-
-	// Log the loaded data in debug mode to facilitate trouble shooting
-	if logger.V(log.DebugLevel) {
-		for key, val := range ep.services {
-			members := val.consistentRing.GetMembers()
-			logger.Debugw(ctx, "service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
-			for _, m := range members {
-				n := m.(Member)
-				logger.Debugw(ctx, "service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
-			}
-		}
-		logger.Debugw(ctx, "device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
-	}
-	return nil
-}
-
-// makeKey creates the string that the hash function uses to create the hash
-func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
-	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
-}
-
-// The consistent package requires a hasher function
-type hasher struct{}
-
-// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
-// best distribution compare to other hash packages
-func (h hasher) Sum64(data []byte) uint64 {
-	return xxhash.Sum64(data)
-}
-
-// Member represents a member on the consistent ring
-type Member interface {
-	String() string
-	getReplica() ReplicaID
-	getEndPoint() Endpoint
-	getID() string
-	getServiceType() string
-}
-
-// member implements the Member interface
-type member struct {
-	id          string
-	serviceType string
-	vendor      string
-	version     string
-	replica     ReplicaID
-	endpoint    Endpoint
-}
-
-func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
-	return &member{
-		id:          ID,
-		serviceType: serviceType,
-		vendor:      vendor,
-		version:     version,
-		replica:     replica,
-		endpoint:    endPoint,
-	}
-}
-
-func (m *member) String() string {
-	return string(m.endpoint)
-}
-
-func (m *member) getReplica() ReplicaID {
-	return m.replica
-}
-
-func (m *member) getEndPoint() Endpoint {
-	return m.endpoint
-}
-
-func (m *member) getID() string {
-	return m.id
-}
-
-func (m *member) getServiceType() string {
-	return m.serviceType
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/kafka_inter_container_library.go
deleted file mode 100644
index 120ed45..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/kafka_inter_container_library.go
+++ /dev/null
@@ -1,1097 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
-	"context"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"reflect"
-	"strings"
-	"sync"
-	"time"
-
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-
-	"github.com/golang/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/golang/protobuf/ptypes/any"
-	"github.com/google/uuid"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opentracing/opentracing-go"
-)
-
-const (
-	DefaultMaxRetries     = 3
-	DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
-)
-
-const (
-	TransactionKey = "transactionID"
-	FromTopic      = "fromTopic"
-)
-
-var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
-var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
-
-// requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
-// obtained from that channel, this interface is invoked.   This is used to handle
-// async requests into the Core via the kafka messaging bus
-type requestHandlerChannel struct {
-	requesthandlerInterface interface{}
-	ch                      <-chan *ic.InterContainerMessage
-}
-
-// transactionChannel represents a combination of a topic and a channel onto which a response received
-// on the kafka bus will be sent to
-type transactionChannel struct {
-	topic *Topic
-	ch    chan *ic.InterContainerMessage
-}
-
-type InterContainerProxy interface {
-	Start(ctx context.Context) error
-	Stop(ctx context.Context)
-	GetDefaultTopic() *Topic
-	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
-	InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
-	SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
-	SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
-	UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
-	DeleteTopic(ctx context.Context, topic Topic) error
-	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
-	SendLiveness(ctx context.Context) error
-}
-
-// interContainerProxy represents the messaging proxy
-type interContainerProxy struct {
-	kafkaAddress                   string
-	defaultTopic                   *Topic
-	defaultRequestHandlerInterface interface{}
-	kafkaClient                    Client
-	doneCh                         chan struct{}
-	doneOnce                       sync.Once
-
-	// This map is used to map a topic to an interface and channel.   When a request is received
-	// on that channel (registered to the topic) then that interface is invoked.
-	topicToRequestHandlerChannelMap   map[string]*requestHandlerChannel
-	lockTopicRequestHandlerChannelMap sync.RWMutex
-
-	// This map is used to map a channel to a response topic.   This channel handles all responses on that
-	// channel for that topic and forward them to the appropriate consumers channel, using the
-	// transactionIdToChannelMap.
-	topicToResponseChannelMap   map[string]<-chan *ic.InterContainerMessage
-	lockTopicResponseChannelMap sync.RWMutex
-
-	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
-	// sent out and we are waiting for a response.
-	transactionIdToChannelMap     map[string]*transactionChannel
-	lockTransactionIdToChannelMap sync.RWMutex
-}
-
-type InterContainerProxyOption func(*interContainerProxy)
-
-func InterContainerAddress(address string) InterContainerProxyOption {
-	return func(args *interContainerProxy) {
-		args.kafkaAddress = address
-	}
-}
-
-func DefaultTopic(topic *Topic) InterContainerProxyOption {
-	return func(args *interContainerProxy) {
-		args.defaultTopic = topic
-	}
-}
-
-func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
-	return func(args *interContainerProxy) {
-		args.defaultRequestHandlerInterface = handler
-	}
-}
-
-func MsgClient(client Client) InterContainerProxyOption {
-	return func(args *interContainerProxy) {
-		args.kafkaClient = client
-	}
-}
-
-func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
-	proxy := &interContainerProxy{
-		kafkaAddress: DefaultKafkaAddress,
-		doneCh:       make(chan struct{}),
-	}
-
-	for _, option := range opts {
-		option(proxy)
-	}
-
-	return proxy
-}
-
-func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
-	return newInterContainerProxy(opts...)
-}
-
-func (kp *interContainerProxy) Start(ctx context.Context) error {
-	logger.Info(ctx, "Starting-Proxy")
-
-	// Kafka MsgClient should already have been created.  If not, output fatal error
-	if kp.kafkaClient == nil {
-		logger.Fatal(ctx, "kafka-client-not-set")
-	}
-
-	// Start the kafka client
-	if err := kp.kafkaClient.Start(ctx); err != nil {
-		logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
-		return err
-	}
-
-	// Create the topic to response channel map
-	kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
-	//
-	// Create the transactionId to Channel Map
-	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
-
-	// Create the topic to request channel map
-	kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
-
-	return nil
-}
-
-func (kp *interContainerProxy) Stop(ctx context.Context) {
-	logger.Info(ctx, "stopping-intercontainer-proxy")
-	kp.doneOnce.Do(func() { close(kp.doneCh) })
-	// TODO : Perform cleanup
-	kp.kafkaClient.Stop(ctx)
-	err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
-	if err != nil {
-		logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
-	}
-	err = kp.deleteAllTopicResponseChannelMap(ctx)
-	if err != nil {
-		logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
-	}
-	kp.deleteAllTransactionIdToChannelMap(ctx)
-}
-
-func (kp *interContainerProxy) GetDefaultTopic() *Topic {
-	return kp.defaultTopic
-}
-
-// InvokeAsyncRPC is used to make an RPC request asynchronously
-func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
-	waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
-
-	spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
-	if spanArg != nil {
-		kvArgs = append(kvArgs, &spanArg[0])
-	}
-
-	defer span.Finish()
-
-	logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
-
-	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
-	// typically the device ID.
-	responseTopic := replyToTopic
-	if responseTopic == nil {
-		responseTopic = kp.GetDefaultTopic()
-	}
-
-	chnl := make(chan *RpcResponse)
-
-	go func() {
-
-		// once we're done,
-		// close the response channel
-		defer close(chnl)
-
-		var err error
-		var protoRequest *ic.InterContainerMessage
-
-		// Encode the request
-		protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
-		if err != nil {
-			logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
-			log.MarkSpanError(ctx, errors.New("cannot-format-request"))
-			chnl <- NewResponse(RpcFormattingError, err, nil)
-			return
-		}
-
-		// Subscribe for response, if needed, before sending request
-		var ch <-chan *ic.InterContainerMessage
-		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
-			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
-			log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
-			chnl <- NewResponse(RpcTransportError, err, nil)
-			return
-		}
-
-		// Send request - if the topic is formatted with a device Id then we will send the request using a
-		// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
-		// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
-		logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
-
-		// if the message is not sent on kafka publish an event an close the channel
-		if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
-			chnl <- NewResponse(RpcTransportError, err, nil)
-			return
-		}
-
-		// if the client is not waiting for a response send the ack and close the channel
-		chnl <- NewResponse(RpcSent, nil, nil)
-		if !waitForResponse {
-			return
-		}
-
-		defer func() {
-			// Remove the subscription for a response on return
-			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
-				logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
-			}
-		}()
-
-		// Wait for response as well as timeout or cancellation
-		select {
-		case msg, ok := <-ch:
-			if !ok {
-				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
-				log.MarkSpanError(ctx, errors.New("channel-closed"))
-				chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
-			}
-			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
-			if responseBody, err := decodeResponse(ctx, msg); err != nil {
-				chnl <- NewResponse(RpcReply, err, nil)
-			} else {
-				if responseBody.Success {
-					chnl <- NewResponse(RpcReply, nil, responseBody.Result)
-				} else {
-					// response body contains an error
-					unpackErr := &ic.Error{}
-					if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
-						chnl <- NewResponse(RpcReply, err, nil)
-					} else {
-						chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
-					}
-				}
-			}
-		case <-ctx.Done():
-			logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
-			log.MarkSpanError(ctx, errors.New("context-cancelled"))
-			err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
-			chnl <- NewResponse(RpcTimeout, err, nil)
-		case <-kp.doneCh:
-			chnl <- NewResponse(RpcSystemClosing, nil, nil)
-			logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
-		}
-	}()
-	return chnl
-}
-
-// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
-// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
-//
-// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
-// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
-// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
-// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
-// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
-func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
-	var err error
-	var newCtx context.Context
-	var spanToInject opentracing.Span
-
-	if !log.GetGlobalLFM().GetLogCorrelationStatus() && !log.GetGlobalLFM().GetTracePublishingStatus() {
-		// if both log correlation and trace publishing is disable do not generate the span
-		logger.Debugw(ctx, "not-embedding-span-in-KVArg-", log.Fields{"rpc": rpc,
-			"log-correlation-status": log.GetGlobalLFM().GetLogCorrelationStatus(), "trace-publishing-status": log.GetGlobalLFM().GetTracePublishingStatus()})
-		return nil, opentracing.GlobalTracer().StartSpan(rpc), ctx
-	}
-
-	var spanName strings.Builder
-	spanName.WriteString("kafka-")
-
-	// In case of inter adapter message, use Msg Type for constructing RPC name
-	if rpc == "process_inter_adapter_message" {
-		if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
-			spanName.WriteString("inter-adapter-")
-			rpc = msgType.String()
-		}
-	}
-
-	if isAsync {
-		spanName.WriteString("async-rpc-")
-	} else {
-		spanName.WriteString("rpc-")
-	}
-	spanName.WriteString(rpc)
-
-	if isAsync {
-		spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
-	} else {
-		spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
-	}
-
-	spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
-
-	textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
-	if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
-		logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
-		return nil, spanToInject, newCtx
-	}
-
-	var textMapJson []byte
-	if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
-		logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
-		return nil, spanToInject, newCtx
-	}
-
-	spanArg := make([]KVArg, 1)
-	spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
-	return spanArg, spanToInject, newCtx
-}
-
-// InvokeRPC is used to send a request to a given topic
-func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
-	waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
-
-	spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
-	if spanArg != nil {
-		kvArgs = append(kvArgs, &spanArg[0])
-	}
-
-	defer span.Finish()
-
-	logger.Debugw(ctx, "InvokeRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
-
-	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
-	// typically the device ID.
-	responseTopic := replyToTopic
-	if responseTopic == nil {
-		responseTopic = kp.defaultTopic
-	}
-
-	// Encode the request
-	protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
-	if err != nil {
-		logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
-		log.MarkSpanError(ctx, errors.New("cannot-format-request"))
-		return false, nil
-	}
-
-	// Subscribe for response, if needed, before sending request
-	var ch <-chan *ic.InterContainerMessage
-	if waitForResponse {
-		var err error
-		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
-			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
-		}
-	}
-
-	// Send request - if the topic is formatted with a device Id then we will send the request using a
-	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
-	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
-	//key := GetDeviceIdFromTopic(*toTopic)
-	logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
-	go func() {
-		if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
-			log.MarkSpanError(ctx, errors.New("send-failed"))
-			logger.Errorw(ctx, "send-failed", log.Fields{
-				"topic": toTopic,
-				"key":   key,
-				"error": err})
-		}
-	}()
-
-	if waitForResponse {
-		// Create a child context based on the parent context, if any
-		var cancel context.CancelFunc
-		childCtx := context.Background()
-		if ctx == nil {
-			ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
-		} else {
-			childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
-		}
-		defer cancel()
-
-		// Wait for response as well as timeout or cancellation
-		// Remove the subscription for a response on return
-		defer func() {
-			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
-				logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
-					"id":    protoRequest.Header.Id,
-					"error": err})
-			}
-		}()
-		select {
-		case msg, ok := <-ch:
-			if !ok {
-				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
-				log.MarkSpanError(ctx, errors.New("channel-closed"))
-				protoError := &ic.Error{Reason: "channel-closed"}
-				var marshalledArg *any.Any
-				if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
-					return false, nil // Should never happen
-				}
-				return false, marshalledArg
-			}
-			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
-			var responseBody *ic.InterContainerResponseBody
-			var err error
-			if responseBody, err = decodeResponse(ctx, msg); err != nil {
-				logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
-				// FIXME we should return something
-			}
-			return responseBody.Success, responseBody.Result
-		case <-ctx.Done():
-			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
-			log.MarkSpanError(ctx, errors.New("context-cancelled"))
-			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
-
-			var marshalledArg *any.Any
-			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
-				return false, nil // Should never happen
-			}
-			return false, marshalledArg
-		case <-childCtx.Done():
-			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
-			log.MarkSpanError(ctx, errors.New("context-cancelled"))
-			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
-
-			var marshalledArg *any.Any
-			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
-				return false, nil // Should never happen
-			}
-			return false, marshalledArg
-		case <-kp.doneCh:
-			logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
-			return true, nil
-		}
-	}
-	return true, nil
-}
-
-// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
-// when a message is received on a given topic
-func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
-
-	// Subscribe to receive messages for that topic
-	var ch <-chan *ic.InterContainerMessage
-	var err error
-	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
-		//if ch, err = kp.Subscribe(topic); err != nil {
-		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
-		return err
-	}
-
-	kp.defaultRequestHandlerInterface = handler
-	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
-	// Launch a go routine to receive and process kafka messages
-	go kp.waitForMessages(ctx, ch, topic, handler)
-
-	return nil
-}
-
-// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
-// when a message is received on a given topic.  So far there is only 1 target registered per microservice
-func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
-	// Subscribe to receive messages for that topic
-	var ch <-chan *ic.InterContainerMessage
-	var err error
-	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
-		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
-		return err
-	}
-	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
-
-	// Launch a go routine to receive and process kafka messages
-	go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
-
-	return nil
-}
-
-func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
-	return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
-}
-
-func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
-	kp.lockTopicResponseChannelMap.Lock()
-	defer kp.lockTopicResponseChannelMap.Unlock()
-	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
-		// Unsubscribe to this topic first - this will close the subscribed channel
-		var err error
-		if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
-			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
-		}
-		delete(kp.topicToResponseChannelMap, topic)
-		return err
-	} else {
-		return fmt.Errorf("%s-Topic-not-found", topic)
-	}
-}
-
-// nolint: unused
-func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
-	logger.Debug(ctx, "delete-all-topic-response-channel")
-	kp.lockTopicResponseChannelMap.Lock()
-	defer kp.lockTopicResponseChannelMap.Unlock()
-	var unsubscribeFailTopics []string
-	for topic := range kp.topicToResponseChannelMap {
-		// Unsubscribe to this topic first - this will close the subscribed channel
-		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
-			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
-			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
-			// Do not return. Continue to try to unsubscribe to other topics.
-		} else {
-			// Only delete from channel map if successfully unsubscribed.
-			delete(kp.topicToResponseChannelMap, topic)
-		}
-	}
-	if len(unsubscribeFailTopics) > 0 {
-		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
-	}
-	return nil
-}
-
-func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
-	kp.lockTopicRequestHandlerChannelMap.Lock()
-	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
-	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
-		kp.topicToRequestHandlerChannelMap[topic] = arg
-	}
-}
-
-func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
-	kp.lockTopicRequestHandlerChannelMap.Lock()
-	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
-	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
-		// Close the kafka client client first by unsubscribing to this topic
-		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
-			return err
-		}
-		delete(kp.topicToRequestHandlerChannelMap, topic)
-		return nil
-	} else {
-		return fmt.Errorf("%s-Topic-not-found", topic)
-	}
-}
-
-// nolint: unused
-func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
-	logger.Debug(ctx, "delete-all-topic-request-channel")
-	kp.lockTopicRequestHandlerChannelMap.Lock()
-	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
-	var unsubscribeFailTopics []string
-	for topic := range kp.topicToRequestHandlerChannelMap {
-		// Close the kafka client client first by unsubscribing to this topic
-		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
-			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
-			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
-			// Do not return. Continue to try to unsubscribe to other topics.
-		} else {
-			// Only delete from channel map if successfully unsubscribed.
-			delete(kp.topicToRequestHandlerChannelMap, topic)
-		}
-	}
-	if len(unsubscribeFailTopics) > 0 {
-		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
-	}
-	return nil
-}
-
-func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
-	kp.lockTransactionIdToChannelMap.Lock()
-	defer kp.lockTransactionIdToChannelMap.Unlock()
-	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
-		kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
-	}
-}
-
-func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
-	kp.lockTransactionIdToChannelMap.Lock()
-	defer kp.lockTransactionIdToChannelMap.Unlock()
-	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
-		// Close the channel first
-		close(transChannel.ch)
-		delete(kp.transactionIdToChannelMap, id)
-	}
-}
-
-func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
-	kp.lockTransactionIdToChannelMap.Lock()
-	defer kp.lockTransactionIdToChannelMap.Unlock()
-	for key, value := range kp.transactionIdToChannelMap {
-		if value.topic.Name == id {
-			close(value.ch)
-			delete(kp.transactionIdToChannelMap, key)
-		}
-	}
-}
-
-// nolint: unused
-func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
-	logger.Debug(ctx, "delete-all-transaction-id-channel-map")
-	kp.lockTransactionIdToChannelMap.Lock()
-	defer kp.lockTransactionIdToChannelMap.Unlock()
-	for key, value := range kp.transactionIdToChannelMap {
-		close(value.ch)
-		delete(kp.transactionIdToChannelMap, key)
-	}
-}
-
-func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
-	// If we have any consumers on that topic we need to close them
-	if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
-		logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
-	}
-	if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
-		logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
-	}
-	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
-
-	return kp.kafkaClient.DeleteTopic(ctx, &topic)
-}
-
-func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
-	// Encode the response argument - needs to be a proto message
-	if returnedVal == nil {
-		return nil, nil
-	}
-	protoValue, ok := returnedVal.(proto.Message)
-	if !ok {
-		logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
-		err := errors.New("response-value-not-proto-message")
-		return nil, err
-	}
-
-	// Marshal the returned value, if any
-	var marshalledReturnedVal *any.Any
-	var err error
-	if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
-		logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
-		return nil, err
-	}
-	return marshalledReturnedVal, nil
-}
-
-func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
-	responseHeader := &ic.Header{
-		Id:        request.Header.Id,
-		Type:      ic.MessageType_RESPONSE,
-		FromTopic: request.Header.ToTopic,
-		ToTopic:   request.Header.FromTopic,
-		Timestamp: ptypes.TimestampNow(),
-	}
-	responseBody := &ic.InterContainerResponseBody{
-		Success: false,
-		Result:  nil,
-	}
-	var marshalledResponseBody *any.Any
-	var err error
-	// Error should never happen here
-	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
-	}
-
-	return &ic.InterContainerMessage{
-		Header: responseHeader,
-		Body:   marshalledResponseBody,
-	}
-
-}
-
-//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
-//or an error on failure
-func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
-	//logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
-	responseHeader := &ic.Header{
-		Id:        request.Header.Id,
-		Type:      ic.MessageType_RESPONSE,
-		FromTopic: request.Header.ToTopic,
-		ToTopic:   request.Header.FromTopic,
-		KeyTopic:  request.Header.KeyTopic,
-		Timestamp: ptypes.TimestampNow(),
-	}
-
-	// Go over all returned values
-	var marshalledReturnedVal *any.Any
-	var err error
-
-	// for now we support only 1 returned value - (excluding the error)
-	if len(returnedValues) > 0 {
-		if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
-			logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
-		}
-	}
-
-	responseBody := &ic.InterContainerResponseBody{
-		Success: success,
-		Result:  marshalledReturnedVal,
-	}
-
-	// Marshal the response body
-	var marshalledResponseBody *any.Any
-	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
-		logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
-		return nil, err
-	}
-
-	return &ic.InterContainerMessage{
-		Header: responseHeader,
-		Body:   marshalledResponseBody,
-	}, nil
-}
-
-func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
-	myClassValue := reflect.ValueOf(myClass)
-	// Capitalize the first letter in the funcName to workaround the first capital letters required to
-	// invoke a function from a different package
-	funcName = strings.Title(funcName)
-	m := myClassValue.MethodByName(funcName)
-	if !m.IsValid() {
-		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
-	}
-	in := make([]reflect.Value, len(params)+1)
-	in[0] = reflect.ValueOf(ctx)
-	for i, param := range params {
-		in[i+1] = reflect.ValueOf(param)
-	}
-	out = m.Call(in)
-	return
-}
-
-func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
-	arg := &KVArg{
-		Key:   TransactionKey,
-		Value: &ic.StrType{Val: transactionId},
-	}
-
-	var marshalledArg *any.Any
-	var err error
-	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
-		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
-		return currentArgs
-	}
-	protoArg := &ic.Argument{
-		Key:   arg.Key,
-		Value: marshalledArg,
-	}
-	return append(currentArgs, protoArg)
-}
-
-func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
-	var marshalledArg *any.Any
-	var err error
-	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
-		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
-		return currentArgs
-	}
-	protoArg := &ic.Argument{
-		Key:   FromTopic,
-		Value: marshalledArg,
-	}
-	return append(currentArgs, protoArg)
-}
-
-// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
-// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
-// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
-// from components currently not sending the span (e.g. openonu adapter)
-func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
-
-	for _, arg := range args {
-		if arg.Key == "span" {
-			var err error
-			var textMapString ic.StrType
-			if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
-				logger.Debug(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
-				break
-			}
-
-			spanTextMap := make(map[string]string)
-			if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
-				logger.Debug(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
-				break
-			}
-
-			var spanContext opentracing.SpanContext
-			if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
-				logger.Debug(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
-				break
-			}
-
-			var receivedRpcName string
-			extractBaggage := func(k, v string) bool {
-				if k == "rpc-span-name" {
-					receivedRpcName = v
-					return false
-				}
-				return true
-			}
-
-			spanContext.ForeachBaggageItem(extractBaggage)
-
-			return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
-		}
-	}
-
-	// Create new Child Span with rpc as name if no span details were received in kafka arguments
-	var spanName strings.Builder
-	spanName.WriteString("kafka-")
-
-	// In case of inter adapter message, use Msg Type for constructing RPC name
-	if rpcName == "process_inter_adapter_message" {
-		for _, arg := range args {
-			if arg.Key == "msg" {
-				iamsg := ic.InterAdapterMessage{}
-				if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
-					spanName.WriteString("inter-adapter-")
-					rpcName = iamsg.Header.Type.String()
-				}
-			}
-		}
-	}
-
-	spanName.WriteString("rpc-")
-	spanName.WriteString(rpcName)
-
-	return opentracing.StartSpanFromContext(ctx, spanName.String())
-}
-
-func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
-
-	// First extract the header to know whether this is a request - responses are handled by a different handler
-	if msg.Header.Type == ic.MessageType_REQUEST {
-		var out []reflect.Value
-		var err error
-
-		// Get the request body
-		requestBody := &ic.InterContainerRequestBody{}
-		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
-			logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
-		} else {
-			logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "args": requestBody.Args})
-			span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
-			defer span.Finish()
-
-			// let the callee unpack the arguments as its the only one that knows the real proto type
-			// Augment the requestBody with the message Id as it will be used in scenarios where cores
-			// are set in pairs and competing
-			requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
-
-			// Augment the requestBody with the From topic name as it will be used in scenarios where a container
-			// needs to send an unsollicited message to the currently requested container
-			requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
-
-			out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
-			if err != nil {
-				logger.Warn(ctx, err)
-			}
-		}
-		// Response required?
-		if requestBody.ResponseRequired {
-			// If we already have an error before then just return that
-			var returnError *ic.Error
-			var returnedValues []interface{}
-			var success bool
-			if err != nil {
-				returnError = &ic.Error{Reason: err.Error()}
-				returnedValues = make([]interface{}, 1)
-				returnedValues[0] = returnError
-			} else {
-				returnedValues = make([]interface{}, 0)
-				// Check for errors first
-				lastIndex := len(out) - 1
-				if out[lastIndex].Interface() != nil { // Error
-					if retError, ok := out[lastIndex].Interface().(error); ok {
-						if retError.Error() == ErrorTransactionNotAcquired.Error() {
-							logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
-							return // Ignore - process is in competing mode and ignored transaction
-						}
-						returnError = &ic.Error{Reason: retError.Error()}
-						returnedValues = append(returnedValues, returnError)
-					} else { // Should never happen
-						returnError = &ic.Error{Reason: "incorrect-error-returns"}
-						returnedValues = append(returnedValues, returnError)
-					}
-				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
-					logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
-					return // Ignore - should not happen
-				} else { // Non-error case
-					success = true
-					for idx, val := range out {
-						//logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
-						if idx != lastIndex {
-							returnedValues = append(returnedValues, val.Interface())
-						}
-					}
-				}
-			}
-
-			var icm *ic.InterContainerMessage
-			if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
-				logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
-				icm = encodeDefaultFailedResponse(ctx, msg)
-			}
-			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
-			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
-			// present then the key will be empty, hence all messages for a given topic will be sent to all
-			// partitions.
-			replyTopic := &Topic{Name: msg.Header.FromTopic}
-			key := msg.Header.KeyTopic
-			logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
-			// TODO: handle error response.
-			go func() {
-				if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
-					logger.Errorw(ctx, "send-reply-failed", log.Fields{
-						"topic": replyTopic,
-						"key":   key,
-						"error": err})
-				}
-			}()
-		}
-	} else if msg.Header.Type == ic.MessageType_RESPONSE {
-		logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
-		go kp.dispatchResponse(ctx, msg)
-	} else {
-		logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
-	}
-}
-
-func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
-	//	Wait for messages
-	for msg := range ch {
-		//logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
-		go kp.handleMessage(context.Background(), msg, targetInterface)
-	}
-}
-
-func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
-	kp.lockTransactionIdToChannelMap.RLock()
-	defer kp.lockTransactionIdToChannelMap.RUnlock()
-	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
-		logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
-		return
-	}
-	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
-}
-
-// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
-// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
-// API. There is one response channel waiting for kafka messages before dispatching the message to the
-// corresponding waiting channel
-func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
-	logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
-
-	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
-	// broadcast any message for this topic to all channels waiting on it.
-	// Set channel size to 1 to prevent deadlock, see VOL-2708
-	ch := make(chan *ic.InterContainerMessage, 1)
-	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
-
-	return ch, nil
-}
-
-func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
-	logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
-	kp.deleteFromTransactionIdToChannelMap(trnsId)
-	return nil
-}
-
-func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
-	return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
-}
-
-func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
-	return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
-}
-
-func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
-	return kp.kafkaClient.SendLiveness(ctx)
-}
-
-//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
-//or an error on failure
-func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
-	requestHeader := &ic.Header{
-		Id:        uuid.New().String(),
-		Type:      ic.MessageType_REQUEST,
-		FromTopic: replyTopic.Name,
-		ToTopic:   toTopic.Name,
-		KeyTopic:  key,
-		Timestamp: ptypes.TimestampNow(),
-	}
-	requestBody := &ic.InterContainerRequestBody{
-		Rpc:              rpc,
-		ResponseRequired: true,
-		ReplyToTopic:     replyTopic.Name,
-	}
-
-	for _, arg := range kvArgs {
-		if arg == nil {
-			// In case the caller sends an array with empty args
-			continue
-		}
-		var marshalledArg *any.Any
-		var err error
-		// ascertain the value interface type is a proto.Message
-		protoValue, ok := arg.Value.(proto.Message)
-		if !ok {
-			logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
-			err := errors.New("argument-value-not-proto-message")
-			return nil, err
-		}
-		if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
-			logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
-			return nil, err
-		}
-		protoArg := &ic.Argument{
-			Key:   arg.Key,
-			Value: marshalledArg,
-		}
-		requestBody.Args = append(requestBody.Args, protoArg)
-	}
-
-	var marshalledData *any.Any
-	var err error
-	if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
-		logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
-		return nil, err
-	}
-	request := &ic.InterContainerMessage{
-		Header: requestHeader,
-		Body:   marshalledData,
-	}
-	return request, nil
-}
-
-func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
-	//	Extract the message body
-	responseBody := ic.InterContainerResponseBody{}
-	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
-		logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
-		return nil, err
-	}
-	//logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
-
-	return &responseBody, nil
-
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/utils.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/utils.go
deleted file mode 100644
index bdc615f..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/utils.go
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
-	"github.com/golang/protobuf/ptypes/any"
-	"strings"
-)
-
-const (
-	TopicSeparator = "_"
-	DeviceIdLength = 24
-)
-
-// A Topic definition - may be augmented with additional attributes eventually
-type Topic struct {
-	// The name of the topic. It must start with a letter,
-	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
-	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
-	// signs (`%`).
-	Name string
-}
-
-type KVArg struct {
-	Key   string
-	Value interface{}
-}
-
-type RpcMType int
-
-const (
-	RpcFormattingError RpcMType = iota
-	RpcSent
-	RpcReply
-	RpcTimeout
-	RpcTransportError
-	RpcSystemClosing
-)
-
-type RpcResponse struct {
-	MType RpcMType
-	Err   error
-	Reply *any.Any
-}
-
-func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
-	return &RpcResponse{
-		MType: messageType,
-		Err:   err,
-		Reply: body,
-	}
-}
-
-// TODO:  Remove and provide better may to get the device id
-// GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
-//			<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
-func GetDeviceIdFromTopic(topic Topic) string {
-	pos := strings.LastIndex(topic.Name, TopicSeparator)
-	if pos == -1 {
-		return ""
-	}
-	adjustedPos := pos + len(TopicSeparator)
-	if adjustedPos >= len(topic.Name) {
-		return ""
-	}
-	deviceId := topic.Name[adjustedPos:len(topic.Name)]
-	if len(deviceId) != DeviceIdLength {
-		return ""
-	}
-	return deviceId
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/endpoint_manager.go
deleted file mode 100644
index 5acec69..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/endpoint_manager.go
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import (
-	"context"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-)
-
-type EndpointManager struct{}
-
-func NewEndpointManager() kafka.EndpointManager {
-	mock := &EndpointManager{}
-	return mock
-}
-
-func (em *EndpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (kafka.Endpoint, error) {
-	// TODO add mocks call and args
-	return kafka.Endpoint(serviceType), nil
-}
-
-func (em *EndpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
-	// TODO add mocks call and args
-	return true, nil
-}
-
-func (em *EndpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (kafka.ReplicaID, error) {
-	return kafka.ReplicaID(1), nil
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_inter_container_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_inter_container_proxy.go
deleted file mode 100644
index a028aa9..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_inter_container_proxy.go
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import (
-	"context"
-
-	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/golang/protobuf/ptypes/any"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-)
-
-type InvokeRpcArgs struct {
-	Rpc             string
-	ToTopic         *kafka.Topic
-	ReplyToTopic    *kafka.Topic
-	WaitForResponse bool
-	Key             string
-	ParentDeviceId  string
-	KvArgs          map[int]interface{}
-}
-
-type InvokeRpcSpy struct {
-	CallCount int
-	Calls     map[int]InvokeRpcArgs
-	Timeout   bool
-	Response  proto.Message
-}
-
-type InvokeAsyncRpcSpy struct {
-	CallCount int
-	Calls     map[int]InvokeRpcArgs
-	Timeout   bool
-	Response  proto.Message
-}
-
-type MockKafkaICProxy struct {
-	InvokeRpcSpy InvokeRpcSpy
-}
-
-func (s *MockKafkaICProxy) Start(ctx context.Context) error { return nil }
-func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
-	t := kafka.Topic{
-		Name: "test-topic",
-	}
-	return &t
-}
-
-func (s *MockKafkaICProxy) DeleteTopic(ctx context.Context, topic kafka.Topic) error { return nil }
-
-func (s *MockKafkaICProxy) Stop(ctx context.Context) {}
-
-func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
-	waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
-
-	args := make(map[int]interface{}, 4)
-	for k, v := range kvArgs {
-		args[k] = v
-	}
-
-	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
-		Rpc:             rpc,
-		ToTopic:         toTopic,
-		ReplyToTopic:    replyToTopic,
-		WaitForResponse: waitForResponse,
-		Key:             key,
-		KvArgs:          args,
-	}
-
-	chnl := make(chan *kafka.RpcResponse)
-
-	return chnl
-}
-
-func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
-	s.InvokeRpcSpy.CallCount++
-
-	success := true
-
-	args := make(map[int]interface{}, 4)
-	for k, v := range kvArgs {
-		args[k] = v
-	}
-
-	s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
-		Rpc:             rpc,
-		ToTopic:         toTopic,
-		ReplyToTopic:    replyToTopic,
-		WaitForResponse: waitForResponse,
-		Key:             key,
-		KvArgs:          args,
-	}
-
-	var response any.Any
-	if s.InvokeRpcSpy.Timeout {
-
-		success = false
-
-		err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
-		res, _ := ptypes.MarshalAny(err)
-		response = *res
-	} else {
-		res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
-		response = *res
-	}
-
-	return success, &response
-}
-func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic kafka.Topic, handler interface{}) error {
-	return nil
-}
-func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic kafka.Topic, initialOffset int64) error {
-	return nil
-}
-func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic kafka.Topic) error {
-	return nil
-}
-func (s *MockKafkaICProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
-	return nil
-}
-func (s *MockKafkaICProxy) SendLiveness(ctx context.Context) error { return nil }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/performance_metrics.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/adapters/common/performance_metrics.go
similarity index 97%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/performance_metrics.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/adapters/common/performance_metrics.go
index 3b6d4f9..524eeea 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/performance_metrics.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/adapters/common/performance_metrics.go
@@ -17,7 +17,7 @@
 package common
 
 import (
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 type PmMetrics struct {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/adapters/common/utils.go
similarity index 77%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/utils.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/adapters/common/utils.go
index 35f227e..d3c562a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/adapters/common/utils.go
@@ -16,11 +16,7 @@
 package common
 
 import (
-	"context"
 	"fmt"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"google.golang.org/grpc/codes"
 	"math/rand"
 	"time"
 )
@@ -75,17 +71,3 @@
 	}
 	return string(b)
 }
-
-func ICProxyErrorCodeToGrpcErrorCode(ctx context.Context, icErr ic.ErrorCodeCodes) codes.Code {
-	switch icErr {
-	case ic.ErrorCode_INVALID_PARAMETERS:
-		return codes.InvalidArgument
-	case ic.ErrorCode_UNSUPPORTED_REQUEST:
-		return codes.Unavailable
-	case ic.ErrorCode_DEADLINE_EXCEEDED:
-		return codes.DeadlineExceeded
-	default:
-		logger.Warnw(ctx, "cannnot-map-ic-error-code-to-grpc-error-code", log.Fields{"err": icErr})
-		return codes.Internal
-	}
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/common.go
index 606d18c..4813ba1 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/common.go
@@ -16,7 +16,7 @@
 package config
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/configmanager.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/configmanager.go
similarity index 97%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/configmanager.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/configmanager.go
index f5efa36..c489407 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/configmanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/configmanager.go
@@ -22,14 +22,14 @@
 	"strings"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v5/pkg/db"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 const (
 	defaultkvStoreConfigPath     = "config"
-	defaultkvStoreDataPathPrefix = "service/voltha_voltha"
+	defaultkvStoreDataPathPrefix = "service/voltha/voltha1_voltha1"
 	kvStorePathSeparator         = "/"
 )
 
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/logcontroller.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/logcontroller.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/logcontroller.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/logcontroller.go
index 68bfb32..b58f999 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/logcontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/logcontroller.go
@@ -26,10 +26,11 @@
 	"crypto/md5"
 	"encoding/json"
 	"errors"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
 	"os"
 	"sort"
 	"strings"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 const (
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/logfeaturescontroller.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/logfeaturescontroller.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/logfeaturescontroller.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/logfeaturescontroller.go
index 95c5bde..579c1de 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/config/logfeaturescontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/config/logfeaturescontroller.go
@@ -19,9 +19,10 @@
 import (
 	"context"
 	"errors"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
 	"os"
 	"strings"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 const (
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/backend.go
similarity index 98%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/backend.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/backend.go
index ff0b5b7..2e57a27 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/backend.go
@@ -23,8 +23,8 @@
 	"sync"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/common.go
index 4bc92b1..d8a0571 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/common.go
@@ -16,7 +16,7 @@
 package db
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/client.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/client.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/client.go
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/common.go
index b8509db..8ac2a4a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/common.go
@@ -16,7 +16,7 @@
 package kvstore
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/etcdclient.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/etcdclient.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/etcdclient.go
index 96ffc2f..6ca5329 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/etcdclient.go
@@ -19,13 +19,14 @@
 	"context"
 	"errors"
 	"fmt"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	v3Client "go.etcd.io/etcd/clientv3"
-	v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
 	"os"
 	"strconv"
 	"sync"
 	"time"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	v3Client "go.etcd.io/etcd/clientv3"
+	v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
 )
 
 const (
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/etcdpool.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/etcdpool.go
similarity index 98%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/etcdpool.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/etcdpool.go
index 6af7d3d..4d33c27 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/etcdpool.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/etcdpool.go
@@ -19,10 +19,11 @@
 	"container/list"
 	"context"
 	"errors"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"go.etcd.io/etcd/clientv3"
 	"sync"
 	"time"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"go.etcd.io/etcd/clientv3"
 )
 
 // EtcdClientAllocator represents a generic interface to allocate an Etcd Client
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/kvutils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/kvutils.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore/kvutils.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore/kvutils.go
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/common.go
index df3e839..0f0468e 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/common.go
@@ -16,7 +16,7 @@
 package events
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/eventif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/eventif/events_proxy_if.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
index 35f821f..e4ebc36 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/eventif/events_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
@@ -18,7 +18,8 @@
 
 import (
 	"context"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 // EventProxy interface for eventproxy
@@ -31,6 +32,8 @@
 		subCategory *EventSubCategory, raisedTs int64) error
 	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
 	SendLiveness(ctx context.Context) error
+	Start() error
+	Stop()
 }
 
 const (
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/events_proxy.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
index 19a4f26..e4493f9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
@@ -26,11 +26,11 @@
 	"sync"
 	"time"
 
-	"github.com/golang/protobuf/ptypes"
-	"github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"google.golang.org/protobuf/types/known/timestamppb"
 )
 
 // TODO: Make configurable through helm chart
@@ -97,17 +97,8 @@
 	header.TypeVersion = eventif.EventTypeVersion
 
 	// raisedTs is in seconds
-	timestamp, err := ptypes.TimestampProto(time.Unix(raisedTs, 0))
-	if err != nil {
-		return nil, err
-	}
-	header.RaisedTs = timestamp
-
-	timestamp, err = ptypes.TimestampProto(time.Now())
-	if err != nil {
-		return nil, err
-	}
-	header.ReportedTs = timestamp
+	header.RaisedTs = timestamppb.New(time.Unix(raisedTs, 0))
+	header.ReportedTs = timestamppb.New(time.Now())
 
 	return &header, nil
 }
@@ -201,7 +192,7 @@
 }
 
 // Start the event proxy
-func (ep *EventProxy) Start() {
+func (ep *EventProxy) Start() error {
 	eq := ep.eventQueue
 	go eq.start(ep.queueCtx)
 	logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
@@ -235,10 +226,13 @@
 				"reported-ts": event.Header.ReportedTs, "event-type": event.EventType})
 		}
 	}
+	return nil
 }
 
 func (ep *EventProxy) Stop() {
-	ep.eventQueue.stop()
+	if ep.eventQueue != nil {
+		ep.eventQueue.stop()
+	}
 }
 
 type EventQueue struct {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/utils.go
similarity index 91%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/utils.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/utils.go
index fe3a017..4598161 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/events/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/utils.go
@@ -19,7 +19,8 @@
 	"fmt"
 	"strconv"
 
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 type ContextType string
@@ -64,8 +65,8 @@
 
 //CreateDeviceStateChangeEvent forms and returns a new DeviceStateChange Event
 func CreateDeviceStateChangeEvent(serialNumber string, deviceID string, parentID string,
-	prevOperStatus voltha.OperStatus_Types, prevConnStatus voltha.ConnectStatus_Types, prevAdminStatus voltha.AdminState_Types,
-	operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types, adminStatus voltha.AdminState_Types,
+	prevOperStatus common.OperStatus_Types, prevConnStatus common.ConnectStatus_Types, prevAdminStatus common.AdminState_Types,
+	operStatus common.OperStatus_Types, connStatus common.ConnectStatus_Types, adminStatus common.AdminState_Types,
 	parentPort uint32, isRoot bool) *voltha.DeviceEvent {
 
 	context := make(map[string]string)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/flows/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/flows/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/common.go
index beb0574..2f5ff92 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/flows/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/common.go
@@ -16,7 +16,7 @@
 package flows
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/flows/flow_utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
similarity index 98%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/flows/flow_utils.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
index ff6aaf0..41b615a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/flows/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
@@ -23,11 +23,12 @@
 	"fmt"
 	"hash"
 	"sort"
+	"sync"
 
 	"github.com/cevaris/ordered_map"
-	"github.com/gogo/protobuf/proto"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
 )
 
 var (
@@ -1323,7 +1324,8 @@
 }
 
 type DeviceRules struct {
-	Rules map[string]*FlowsAndGroups
+	Rules     map[string]*FlowsAndGroups
+	rulesLock sync.RWMutex
 }
 
 func NewDeviceRules() *DeviceRules {
@@ -1335,6 +1337,8 @@
 func (dr *DeviceRules) Copy() *DeviceRules {
 	copyDR := NewDeviceRules()
 	if dr != nil {
+		dr.rulesLock.RLock()
+		defer dr.rulesLock.RUnlock()
 		for key, val := range dr.Rules {
 			if val != nil {
 				copyDR.Rules[key] = val.Copy()
@@ -1344,7 +1348,19 @@
 	return copyDR
 }
 
+func (dr *DeviceRules) Keys() []string {
+	dr.rulesLock.RLock()
+	defer dr.rulesLock.RUnlock()
+	keys := make([]string, 0, len(dr.Rules))
+	for k := range dr.Rules {
+		keys = append(keys, k)
+	}
+	return keys
+}
+
 func (dr *DeviceRules) ClearFlows(deviceId string) {
+	dr.rulesLock.Lock()
+	defer dr.rulesLock.Unlock()
 	if _, exist := dr.Rules[deviceId]; exist {
 		dr.Rules[deviceId].Flows = ordered_map.NewOrderedMap()
 	}
@@ -1352,6 +1368,8 @@
 
 func (dr *DeviceRules) FilterRules(deviceIds map[string]string) *DeviceRules {
 	filteredDR := NewDeviceRules()
+	dr.rulesLock.RLock()
+	defer dr.rulesLock.RUnlock()
 	for key, val := range dr.Rules {
 		if _, exist := deviceIds[key]; exist {
 			filteredDR.Rules[key] = val.Copy()
@@ -1361,6 +1379,8 @@
 }
 
 func (dr *DeviceRules) AddFlow(deviceId string, flow *ofp.OfpFlowStats) {
+	dr.rulesLock.Lock()
+	defer dr.rulesLock.Unlock()
 	if _, exist := dr.Rules[deviceId]; !exist {
 		dr.Rules[deviceId] = NewFlowsAndGroups()
 	}
@@ -1368,11 +1388,15 @@
 }
 
 func (dr *DeviceRules) GetRules() map[string]*FlowsAndGroups {
+	dr.rulesLock.RLock()
+	defer dr.rulesLock.RUnlock()
 	return dr.Rules
 }
 
 func (dr *DeviceRules) String() string {
 	var buffer bytes.Buffer
+	dr.rulesLock.RLock()
+	defer dr.rulesLock.RUnlock()
 	for key, value := range dr.Rules {
 		buffer.WriteString("DeviceId:")
 		buffer.WriteString(key)
@@ -1383,6 +1407,8 @@
 }
 
 func (dr *DeviceRules) AddFlowsAndGroup(deviceId string, fg *FlowsAndGroups) {
+	dr.rulesLock.Lock()
+	defer dr.rulesLock.Unlock()
 	if _, ok := dr.Rules[deviceId]; !ok {
 		dr.Rules[deviceId] = NewFlowsAndGroups()
 	}
@@ -1392,6 +1418,8 @@
 // CreateEntryIfNotExist creates a new deviceId in the Map if it does not exist and assigns an
 // empty FlowsAndGroups to it.  Otherwise, it does nothing.
 func (dr *DeviceRules) CreateEntryIfNotExist(deviceId string) {
+	dr.rulesLock.Lock()
+	defer dr.rulesLock.Unlock()
 	if _, ok := dr.Rules[deviceId]; !ok {
 		dr.Rules[deviceId] = NewFlowsAndGroups()
 	}
@@ -1568,7 +1596,7 @@
 	//convert each octet to decimal
 	for i := 0; i < 6; i++ {
 		if i != 0 {
-			catalyze = catalyze >> 8
+			catalyze >>= 8
 		}
 		octet := mac & catalyze
 		octetDecimal := octet >> uint8(40-i*8)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
new file mode 100644
index 0000000..de649d6
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
@@ -0,0 +1,541 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+	"context"
+	"fmt"
+	"reflect"
+	"strings"
+	"sync"
+	"time"
+
+	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+	"github.com/opencord/voltha-protos/v5/go/core"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/keepalive"
+)
+
+type event byte
+type state byte
+type SetAndTestServiceHandler func(context.Context, *grpc.ClientConn) interface{}
+type RestartedHandler func(ctx context.Context, endPoint string) error
+
+type contextKey string
+
+func (c contextKey) String() string {
+	return string(c)
+}
+
+var (
+	grpcMonitorContextKey = contextKey("grpc-monitor")
+)
+
+const (
+	grpcBackoffInitialInterval = "GRPC_BACKOFF_INITIAL_INTERVAL"
+	grpcBackoffMaxInterval     = "GRPC_BACKOFF_MAX_INTERVAL"
+	grpcBackoffMaxElapsedTime  = "GRPC_BACKOFF_MAX_ELAPSED_TIME"
+	grpcMonitorInterval        = "GRPC_MONITOR_INTERVAL"
+)
+
+const (
+	DefaultBackoffInitialInterval = 100 * time.Millisecond
+	DefaultBackoffMaxInterval     = 5 * time.Second
+	DefaultBackoffMaxElapsedTime  = 0 * time.Second // No time limit
+	DefaultGRPCMonitorInterval    = 5 * time.Second
+)
+
+const (
+	connectionErrorSubString  = "SubConns are in TransientFailure"
+	connectionClosedSubstring = "client connection is closing"
+	connectionError           = "connection error"
+	connectionSystemNotReady  = "system is not ready"
+)
+
+const (
+	eventConnecting = event(iota)
+	eventConnected
+	eventDisconnected
+	eventStopped
+	eventError
+
+	stateConnected = state(iota)
+	stateConnecting
+	stateDisconnected
+)
+
+type Client struct {
+	apiEndPoint            string
+	connection             *grpc.ClientConn
+	connectionLock         sync.RWMutex
+	stateLock              sync.RWMutex
+	state                  state
+	service                interface{}
+	events                 chan event
+	onRestart              RestartedHandler
+	backoffInitialInterval time.Duration
+	backoffMaxInterval     time.Duration
+	backoffMaxElapsedTime  time.Duration
+	activityCheck          bool
+	monitorInterval        time.Duration
+	activeCh               chan struct{}
+	activeChMutex          sync.RWMutex
+	done                   bool
+	livenessCallback       func(timestamp time.Time)
+}
+
+type ClientOption func(*Client)
+
+func ActivityCheck(enable bool) ClientOption {
+	return func(args *Client) {
+		args.activityCheck = enable
+	}
+}
+
+func NewClient(endpoint string, onRestart RestartedHandler, opts ...ClientOption) (*Client, error) {
+	c := &Client{
+		apiEndPoint:            endpoint,
+		onRestart:              onRestart,
+		events:                 make(chan event, 1),
+		state:                  stateDisconnected,
+		backoffInitialInterval: DefaultBackoffInitialInterval,
+		backoffMaxInterval:     DefaultBackoffMaxInterval,
+		backoffMaxElapsedTime:  DefaultBackoffMaxElapsedTime,
+		monitorInterval:        DefaultGRPCMonitorInterval,
+	}
+	for _, option := range opts {
+		option(c)
+	}
+
+	// Check for environment variables
+	if err := SetFromEnvVariable(grpcBackoffInitialInterval, &c.backoffInitialInterval); err != nil {
+		logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
+	}
+
+	if err := SetFromEnvVariable(grpcBackoffMaxInterval, &c.backoffMaxInterval); err != nil {
+		logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffMaxInterval})
+	}
+
+	if err := SetFromEnvVariable(grpcBackoffMaxElapsedTime, &c.backoffMaxElapsedTime); err != nil {
+		logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffMaxElapsedTime})
+	}
+
+	if err := SetFromEnvVariable(grpcMonitorInterval, &c.monitorInterval); err != nil {
+		logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcMonitorInterval})
+	}
+
+	logger.Infow(context.Background(), "initialized-client", log.Fields{"client": c})
+
+	// Sanity check
+	if c.backoffInitialInterval > c.backoffMaxInterval {
+		return nil, fmt.Errorf("initial retry delay %v is greater than maximum retry delay %v", c.backoffInitialInterval, c.backoffMaxInterval)
+	}
+
+	return c, nil
+}
+
+func (c *Client) GetClient() (interface{}, error) {
+	c.connectionLock.RLock()
+	defer c.connectionLock.RUnlock()
+	if c.service == nil {
+		return nil, fmt.Errorf("no connection to %s", c.apiEndPoint)
+	}
+	return c.service, nil
+}
+
+// GetCoreServiceClient is a helper function that returns a concrete service instead of the GetClient() API
+// which returns an interface
+func (c *Client) GetCoreServiceClient() (core.CoreServiceClient, error) {
+	c.connectionLock.RLock()
+	defer c.connectionLock.RUnlock()
+	if c.service == nil {
+		return nil, fmt.Errorf("no core connection to %s", c.apiEndPoint)
+	}
+	client, ok := c.service.(core.CoreServiceClient)
+	if ok {
+		return client, nil
+	}
+	return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
+}
+
+// GetOnuAdapterServiceClient is a helper function that returns a concrete service instead of the GetClient() API
+// which returns an interface
+func (c *Client) GetOnuInterAdapterServiceClient() (adapter_services.OnuInterAdapterServiceClient, error) {
+	c.connectionLock.RLock()
+	defer c.connectionLock.RUnlock()
+	if c.service == nil {
+		return nil, fmt.Errorf("no child adapter connection to %s", c.apiEndPoint)
+	}
+	client, ok := c.service.(adapter_services.OnuInterAdapterServiceClient)
+	if ok {
+		return client, nil
+	}
+	return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
+}
+
+// GetOltAdapterServiceClient is a helper function that returns a concrete service instead of the GetClient() API
+// which returns an interface
+func (c *Client) GetOltInterAdapterServiceClient() (adapter_services.OltInterAdapterServiceClient, error) {
+	c.connectionLock.RLock()
+	defer c.connectionLock.RUnlock()
+	if c.service == nil {
+		return nil, fmt.Errorf("no parent adapter connection to %s", c.apiEndPoint)
+	}
+	client, ok := c.service.(adapter_services.OltInterAdapterServiceClient)
+	if ok {
+		return client, nil
+	}
+	return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
+}
+
+func (c *Client) Reset(ctx context.Context) {
+	logger.Debugw(ctx, "resetting-client-connection", log.Fields{"endpoint": c.apiEndPoint})
+	c.stateLock.Lock()
+	defer c.stateLock.Unlock()
+	if c.state == stateConnected {
+		c.state = stateDisconnected
+		c.events <- eventDisconnected
+	}
+}
+
+func (c *Client) clientInterceptor(ctx context.Context, method string, req interface{}, reply interface{},
+	cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+	// Nothing to do before intercepting the call
+	err := invoker(ctx, method, req, reply, cc, opts...)
+	// On connection failure, start the reconnect process depending on the error response
+	if err != nil {
+		logger.Errorw(ctx, "received-error", log.Fields{"error": err, "context": ctx, "endpoint": c.apiEndPoint})
+		if strings.Contains(err.Error(), connectionErrorSubString) ||
+			strings.Contains(err.Error(), connectionError) ||
+			strings.Contains(err.Error(), connectionSystemNotReady) ||
+			isGrpcMonitorKeyPresentInContext(ctx) {
+			c.stateLock.Lock()
+			if c.state == stateConnected {
+				logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+				c.state = stateDisconnected
+				c.events <- eventDisconnected
+			}
+			c.stateLock.Unlock()
+		} else if strings.Contains(err.Error(), connectionClosedSubstring) {
+			logger.Errorw(context.Background(), "invalid-client-connection-closed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+		}
+		return err
+	}
+	// Update activity on success only
+	c.updateActivity(ctx)
+	return nil
+}
+
+// updateActivity pushes an activity indication on the channel so that the monitoring routine does not validate
+// the gRPC connection when the connection is being used. Note that this update is done both when the connection
+// is alive or a connection error is returned. A separate routine takes care of doing the re-connect.
+func (c *Client) updateActivity(ctx context.Context) {
+	c.activeChMutex.RLock()
+	defer c.activeChMutex.RUnlock()
+	if c.activeCh != nil {
+		logger.Debugw(ctx, "update-activity", log.Fields{"api-endpoint": c.apiEndPoint})
+		c.activeCh <- struct{}{}
+
+		// Update liveness only in connected state
+		if c.livenessCallback != nil {
+			c.stateLock.RLock()
+			if c.state == stateConnected {
+				c.livenessCallback(time.Now())
+			}
+			c.stateLock.RUnlock()
+		}
+	}
+}
+
+func WithGrpcMonitorContext(ctx context.Context, name string) context.Context {
+	ctx = context.WithValue(ctx, grpcMonitorContextKey, name)
+	return ctx
+}
+
+func isGrpcMonitorKeyPresentInContext(ctx context.Context) bool {
+	if ctx != nil {
+		_, present := ctx.Value(grpcMonitorContextKey).(string)
+		return present
+	}
+	return false
+}
+
+// monitorActivity monitors the activity on the gRPC connection.   If there are no activity after a specified
+// timeout, it will send a default API request on that connection.   If the connection is good then nothing
+// happens.  If it's bad this will trigger reconnection attempts.
+func (c *Client) monitorActivity(ctx context.Context, handler SetAndTestServiceHandler) {
+	logger.Infow(ctx, "start-activity-monitor", log.Fields{"endpoint": c.apiEndPoint})
+
+	// Create an activity monitor channel.  Unbuffered channel works well.  However, we use a buffered
+	// channel here as a safeguard of having the grpc interceptor publishing too many events that can be
+	// consumed by this monitoring thread
+	c.activeChMutex.Lock()
+	c.activeCh = make(chan struct{}, 10)
+	c.activeChMutex.Unlock()
+
+	// Interval to wait for no activity before probing the connection
+	timeout := c.monitorInterval
+loop:
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+
+		case <-c.activeCh:
+			logger.Debugw(ctx, "received-active-notification", log.Fields{"endpoint": c.apiEndPoint})
+
+			// Reset timer
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+
+		case <-ctx.Done():
+			break loop
+
+		case <-timeoutTimer.C:
+			// Trigger an activity check if the state is connected.  If the state is not connected then there is already
+			// a backoff retry mechanism in place to retry establishing connection.
+			c.stateLock.RLock()
+			runCheck := c.state == stateConnected
+			c.stateLock.RUnlock()
+			if runCheck {
+				go func() {
+					logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.apiEndPoint})
+					subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
+					defer cancel()
+					subCtx = WithGrpcMonitorContext(subCtx, "grpc-monitor")
+					c.connectionLock.RLock()
+					defer c.connectionLock.RUnlock()
+					if c.connection != nil {
+						response := handler(subCtx, c.connection)
+						logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.apiEndPoint, "up": response != nil})
+					}
+				}()
+			}
+		}
+	}
+	logger.Infow(ctx, "activity-monitor-stopping", log.Fields{"endpoint": c.apiEndPoint})
+}
+
+// Start kicks off the adapter agent by trying to connect to the adapter
+func (c *Client) Start(ctx context.Context, handler SetAndTestServiceHandler) {
+	logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.apiEndPoint})
+
+	// If the context contains a k8s probe then register services
+	p := probe.GetProbeFromContext(ctx)
+	if p != nil {
+		p.RegisterService(ctx, c.apiEndPoint)
+	}
+
+	// Enable activity check, if required
+	if c.activityCheck {
+		go c.monitorActivity(ctx, handler)
+	}
+
+	initialConnection := true
+	c.events <- eventConnecting
+	backoff := NewBackoff(c.backoffInitialInterval, c.backoffMaxInterval, c.backoffMaxElapsedTime)
+	attempt := 1
+loop:
+	for {
+		select {
+		case <-ctx.Done():
+			logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
+			return
+		case event := <-c.events:
+			logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+			switch event {
+			case eventConnecting:
+				logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.apiEndPoint, "attempts": attempt})
+
+				c.stateLock.Lock()
+				if c.state == stateConnected {
+					c.state = stateDisconnected
+				}
+				if c.state != stateConnecting {
+					c.state = stateConnecting
+					go func() {
+						if err := c.connectToEndpoint(ctx, handler, p); err != nil {
+							c.stateLock.Lock()
+							c.state = stateDisconnected
+							c.stateLock.Unlock()
+							logger.Errorw(ctx, "connection-failed", log.Fields{"endpoint": c.apiEndPoint, "attempt": attempt, "error": err})
+
+							// Retry connection after a delay
+							if err = backoff.Backoff(ctx); err != nil {
+								// Context has closed or reached maximum elapsed time, if set
+								logger.Errorw(ctx, "retry-aborted", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+								return
+							}
+							attempt += 1
+							c.events <- eventConnecting
+						} else {
+							backoff.Reset()
+						}
+					}()
+				}
+				c.stateLock.Unlock()
+
+			case eventConnected:
+				logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.apiEndPoint})
+				attempt = 1
+				c.stateLock.Lock()
+				if c.state != stateConnected {
+					c.state = stateConnected
+					if initialConnection {
+						logger.Debugw(ctx, "initial-endpoint-connection", log.Fields{"endpoint": c.apiEndPoint})
+						initialConnection = false
+					} else {
+						logger.Debugw(ctx, "endpoint-reconnection", log.Fields{"endpoint": c.apiEndPoint})
+						// Trigger any callback on a restart
+						go func() {
+							err := c.onRestart(log.WithSpanFromContext(context.Background(), ctx), c.apiEndPoint)
+							if err != nil {
+								logger.Errorw(ctx, "unable-to-restart-endpoint", log.Fields{"error": err, "endpoint": c.apiEndPoint})
+							}
+						}()
+					}
+				}
+				c.stateLock.Unlock()
+
+			case eventDisconnected:
+				if p != nil {
+					p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusNotReady)
+				}
+				logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.apiEndPoint, "status": c.state})
+
+				// Try to connect again
+				c.events <- eventConnecting
+
+			case eventStopped:
+				logger.Debugw(ctx, "endPoint-stopped", log.Fields{"adapter": c.apiEndPoint})
+				go func() {
+					if err := c.closeConnection(ctx, p); err != nil {
+						logger.Errorw(ctx, "endpoint-closing-connection-failed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
+					}
+				}()
+				break loop
+			case eventError:
+				logger.Errorw(ctx, "endpoint-error-event", log.Fields{"endpoint": c.apiEndPoint})
+			default:
+				logger.Errorw(ctx, "endpoint-unknown-event", log.Fields{"endpoint": c.apiEndPoint, "error": event})
+			}
+		}
+	}
+	logger.Infow(ctx, "endpoint-stopped", log.Fields{"endpoint": c.apiEndPoint})
+}
+
+func (c *Client) connectToEndpoint(ctx context.Context, handler SetAndTestServiceHandler, p *probe.Probe) error {
+	if p != nil {
+		p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusPreparing)
+	}
+
+	c.connectionLock.Lock()
+	defer c.connectionLock.Unlock()
+
+	if c.connection != nil {
+		_ = c.connection.Close()
+		c.connection = nil
+	}
+
+	c.service = nil
+
+	// Use Interceptors to:
+	// 1. automatically inject
+	// 2. publish Open Tracing Spans by this GRPC Client
+	// 3. detect connection failure on client calls such that the reconnection process can begin
+	conn, err := grpc.Dial(c.apiEndPoint,
+		grpc.WithInsecure(),
+		grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
+			grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
+		)),
+		grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
+			grpc_opentracing.UnaryClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
+		)),
+		grpc.WithUnaryInterceptor(c.clientInterceptor),
+		// Set keealive parameter - use default grpc values
+		grpc.WithKeepaliveParams(keepalive.ClientParameters{
+			Time:                c.monitorInterval,
+			Timeout:             c.backoffMaxInterval,
+			PermitWithoutStream: true,
+		}),
+	)
+
+	if err == nil {
+		subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
+		defer cancel()
+		svc := handler(subCtx, conn)
+		if svc != nil {
+			c.connection = conn
+			c.service = svc
+			if p != nil {
+				p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusRunning)
+			}
+			logger.Infow(ctx, "connected-to-endpoint", log.Fields{"endpoint": c.apiEndPoint})
+			c.events <- eventConnected
+			return nil
+		}
+	}
+	logger.Warnw(ctx, "Failed to connect to endpoint",
+		log.Fields{
+			"endpoint": c.apiEndPoint,
+			"error":    err,
+		})
+
+	if p != nil {
+		p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusFailed)
+	}
+	return fmt.Errorf("no connection to endpoint %s", c.apiEndPoint)
+}
+
+func (c *Client) closeConnection(ctx context.Context, p *probe.Probe) error {
+	if p != nil {
+		p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusStopped)
+	}
+
+	c.connectionLock.Lock()
+	defer c.connectionLock.Unlock()
+
+	if c.connection != nil {
+		err := c.connection.Close()
+		c.connection = nil
+		return err
+	}
+
+	return nil
+}
+
+func (c *Client) Stop(ctx context.Context) {
+	if !c.done {
+		c.events <- eventStopped
+		close(c.events)
+		c.done = true
+	}
+}
+
+// SetService is used for testing only
+func (c *Client) SetService(srv interface{}) {
+	c.connectionLock.Lock()
+	defer c.connectionLock.Unlock()
+	c.service = srv
+}
+
+func (c *Client) SubscribeForLiveness(callback func(timestamp time.Time)) {
+	c.livenessCallback = callback
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/common.go
index cb5480b..77aad4f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/common.go
@@ -16,7 +16,7 @@
 package grpc
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/mock_core_service.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/mock_core_service.go
new file mode 100644
index 0000000..745753c
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/mock_core_service.go
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+	"context"
+	"strconv"
+	"time"
+
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+//MockCoreServiceHandler implements the methods in the core service
+type MockCoreServiceHandler struct{}
+
+func (handler *MockCoreServiceHandler) RegisterAdapter(ctx context.Context, reg *ic.AdapterRegistration) (*empty.Empty, error) {
+	//logger.Debugw(ctx, "registration-received", log.Fields{"input": reg})
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) DeviceUpdate(context.Context, *voltha.Device) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) PortCreated(context.Context, *voltha.Port) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) PortsStateUpdate(context.Context, *ic.PortStateFilter) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) DeleteAllPorts(context.Context, *common.ID) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) GetDevicePort(context.Context, *ic.PortFilter) (*voltha.Port, error) {
+	return &voltha.Port{}, nil
+}
+
+func (handler *MockCoreServiceHandler) ListDevicePorts(context.Context, *common.ID) (*voltha.Ports, error) {
+	return &voltha.Ports{}, nil
+}
+
+func (handler *MockCoreServiceHandler) DeviceStateUpdate(context.Context, *ic.DeviceStateFilter) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) DevicePMConfigUpdate(context.Context, *voltha.PmConfigs) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) ChildDeviceDetected(context.Context, *ic.DeviceDiscovery) (*voltha.Device, error) {
+	return &voltha.Device{}, nil
+}
+
+func (handler *MockCoreServiceHandler) ChildDevicesLost(context.Context, *common.ID) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) ChildDevicesDetected(context.Context, *common.ID) (*empty.Empty, error) {
+	time.Sleep(50 * time.Millisecond)
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) GetDevice(ctx context.Context, id *common.ID) (*voltha.Device, error) {
+	time.Sleep(50 * time.Millisecond)
+	vlan, _ := strconv.Atoi(id.Id)
+	return &voltha.Device{
+		Id:   id.Id,
+		Type: "test-1234",
+		Vlan: uint32(vlan),
+	}, nil
+}
+
+func (handler *MockCoreServiceHandler) GetChildDevice(context.Context, *ic.ChildDeviceFilter) (*voltha.Device, error) {
+	return nil, nil
+}
+
+func (handler *MockCoreServiceHandler) GetChildDevices(context.Context, *common.ID) (*voltha.Devices, error) {
+	return &voltha.Devices{}, nil
+}
+
+func (handler *MockCoreServiceHandler) SendPacketIn(context.Context, *ic.PacketIn) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) DeviceReasonUpdate(context.Context, *ic.DeviceReason) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) PortStateUpdate(context.Context, *ic.PortState) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+// Additional API found in the Core - unused?
+func (handler *MockCoreServiceHandler) ReconcileChildDevices(context.Context, *common.ID) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) GetChildDeviceWithProxyAddress(context.Context, *voltha.Device_ProxyAddress) (*voltha.Device, error) {
+	return &voltha.Device{}, nil
+}
+
+func (handler *MockCoreServiceHandler) GetPorts(context.Context, *ic.PortFilter) (*voltha.Ports, error) {
+	return &voltha.Ports{}, nil
+}
+
+func (handler *MockCoreServiceHandler) ChildrenStateUpdate(context.Context, *ic.DeviceStateFilter) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) UpdateImageDownload(context.Context, *voltha.ImageDownload) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+func (handler *MockCoreServiceHandler) GetHealthStatus(ctx context.Context, empty *empty.Empty) (*voltha.HealthStatus, error) {
+	return &voltha.HealthStatus{State: voltha.HealthStatus_HEALTHY}, nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/security.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/security.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/security.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/security.go
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/server.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/server.go
similarity index 98%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/server.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/server.go
index 38dc308..bee418d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/grpc/server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/server.go
@@ -21,7 +21,7 @@
 
 	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
 	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/utils.go
new file mode 100644
index 0000000..85686de
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/utils.go
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package grpc
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"math"
+	"os"
+	"reflect"
+	"sync"
+	"time"
+)
+
+const (
+	incrementalFactor = 1.5
+	minBackOff        = 10 * time.Millisecond
+)
+
+type Backoff struct {
+	attempt          int
+	initialInterval  time.Duration
+	maxElapsedTime   time.Duration
+	maxInterval      time.Duration
+	totalElapsedTime time.Duration
+	mutex            sync.RWMutex
+}
+
+func NewBackoff(initialInterval, maxInterval, maxElapsedTime time.Duration) *Backoff {
+	bo := &Backoff{}
+	bo.initialInterval = initialInterval
+	bo.maxInterval = maxInterval
+	bo.maxElapsedTime = maxElapsedTime
+	return bo
+}
+
+func (bo *Backoff) Backoff(ctx context.Context) error {
+	duration, err := bo.getBackOffDuration()
+	if err != nil {
+		return err
+	}
+
+	ticker := time.NewTicker(duration)
+	defer ticker.Stop()
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-ticker.C:
+	}
+	return nil
+}
+
+func (bo *Backoff) getBackOffDuration() (duration time.Duration, err error) {
+	err = nil
+	defer func() {
+		bo.mutex.Lock()
+		defer bo.mutex.Unlock()
+		bo.attempt += 1
+		bo.totalElapsedTime += duration
+		if bo.maxElapsedTime > 0 && bo.totalElapsedTime > bo.maxElapsedTime {
+			err = errors.New("max elapsed backoff time reached")
+		}
+	}()
+
+	if bo.initialInterval <= minBackOff {
+		bo.initialInterval = minBackOff
+	}
+	if bo.initialInterval > bo.maxInterval {
+		duration = bo.initialInterval
+		return
+	}
+
+	// Calculate incremental duration
+	minf := float64(bo.initialInterval)
+	durf := minf * math.Pow(incrementalFactor, float64(bo.attempt))
+
+	if durf > math.MaxInt64 {
+		duration = bo.maxInterval
+		return
+	}
+	duration = time.Duration(durf)
+
+	//Keep within bounds
+	if duration < bo.initialInterval {
+		duration = bo.initialInterval
+	}
+	if duration > bo.maxInterval {
+		duration = bo.maxInterval
+	}
+	return
+}
+
+func (bo *Backoff) Reset() {
+	bo.mutex.Lock()
+	defer bo.mutex.Unlock()
+	bo.attempt = 0
+	bo.totalElapsedTime = 0
+}
+
+func SetFromEnvVariable(key string, variableToSet interface{}) error {
+	if _, ok := variableToSet.(*time.Duration); !ok {
+		return fmt.Errorf("unsupported type %T", variableToSet)
+	}
+	if valStr, present := os.LookupEnv(key); present {
+		val, err := time.ParseDuration(valStr)
+		if err != nil {
+			return err
+		}
+		reflect.ValueOf(variableToSet).Elem().Set(reflect.ValueOf(val))
+	}
+	return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
similarity index 86%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/client.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
index eee9631..fdc05bc 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
@@ -12,6 +12,11 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ *
+ * NOTE:  The kafka client is used to publish events on Kafka in voltha
+ * release 2.9.  It is no longer used for inter voltha container
+ * communication.
  */
 package kafka
 
@@ -19,7 +24,7 @@
 	"context"
 	"time"
 
-	ca "github.com/opencord/voltha-protos/v4/go/inter_container"
+	"github.com/golang/protobuf/proto"
 )
 
 const (
@@ -55,6 +60,7 @@
 	DefaultNumberReplicas           = 1
 	DefaultAutoCreateTopic          = false
 	DefaultMetadataMaxRetry         = 3
+	DefaultMaxRetries               = 3
 	DefaultLivenessChannelInterval  = time.Second * 30
 )
 
@@ -64,8 +70,8 @@
 	Stop(ctx context.Context)
 	CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
 	DeleteTopic(ctx context.Context, topic *Topic) error
-	Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
-	UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ca.InterContainerMessage) error
+	Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error)
+	UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error
 	SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time))
 	Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
 	SendLiveness(ctx context.Context) error
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/common.go
index f4d7661..c0d169a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/common.go
@@ -16,7 +16,7 @@
 package kafka
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
similarity index 95%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/sarama_client.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
index 3273470..185f6ec 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
@@ -27,10 +27,8 @@
 	scc "github.com/bsm/sarama-cluster"
 	"github.com/eapache/go-resiliency/breaker"
 	"github.com/golang/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
 	"github.com/google/uuid"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 // consumerChannels represents one or more consumers listening on a kafka topic.  Once a message is received on that
@@ -38,7 +36,7 @@
 //consumer or a group consumer
 type consumerChannels struct {
 	consumers []interface{}
-	channels  []chan *ic.InterContainerMessage
+	channels  []chan proto.Message
 }
 
 // static check to ensure SaramaClient implements Client
@@ -378,7 +376,7 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -388,13 +386,13 @@
 	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
 		logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
 		// Create a channel specific for that consumers and add it to the consumers channel map
-		ch := make(chan *ic.InterContainerMessage)
+		ch := make(chan proto.Message)
 		sc.addChannelToConsumerChannelMap(ctx, topic, ch)
 		return ch, nil
 	}
 
 	// Register for the topic and set it up
-	var consumerListeningChannel chan *ic.InterContainerMessage
+	var consumerListeningChannel chan proto.Message
 	var err error
 
 	// Use the consumerType option to figure out the type of consumer to launch
@@ -441,7 +439,7 @@
 }
 
 //UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -609,7 +607,7 @@
 			// without blocking others. The monitor shouldn't really fall
 			// behind...
 			sc.liveness = make(chan bool, 10)
-			// post intial state to the channel
+			// post initial state to the channel
 			sc.liveness <- sc.alive
 		}
 	} else {
@@ -635,7 +633,7 @@
 			// without blocking others. The monitor shouldn't really fall
 			// behind...
 			sc.healthiness = make(chan bool, 10)
-			// post intial state to the channel
+			// post initial state to the channel
 			sc.healthiness <- sc.healthy
 		}
 	} else {
@@ -749,7 +747,7 @@
 	return nil
 }
 
-func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -788,7 +786,7 @@
 	return err
 }
 
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -908,19 +906,18 @@
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
 // topic via the unique channel each subscriber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.RLock()
 	for _, ch := range consumerCh.channels {
-		go func(c chan *ic.InterContainerMessage) {
+		go func(c chan proto.Message) {
 			c <- protoMessage
 		}(ch)
 	}
 	sc.lockTopicToConsumerChannelMap.RUnlock()
 
 	if callback := sc.metadataCallback; callback != nil {
-		ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
-		callback(protoMessage.Header.FromTopic, ts)
+		callback(fromTopic, ts)
 	}
 }
 
@@ -948,12 +945,12 @@
 			msgBody := msg.Value
 			sc.updateLiveness(ctx, true)
 			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
-			icm := &ic.InterContainerMessage{}
-			if err := proto.Unmarshal(msgBody, icm); err != nil {
+			var protoMsg proto.Message
+			if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
 				logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
 				continue
 			}
-			go sc.dispatchToConsumers(consumerChnls, icm)
+			go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
 		case <-sc.doneCh:
 			logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
@@ -989,12 +986,12 @@
 			sc.updateLiveness(ctx, true)
 			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
-			icm := &ic.InterContainerMessage{}
-			if err := proto.Unmarshal(msgBody, icm); err != nil {
+			var protoMsg proto.Message
+			if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
 				logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
 				continue
 			}
-			go sc.dispatchToConsumers(consumerChnls, icm)
+			go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
 			consumer.MarkOffset(msg, "")
 		case ntf := <-consumer.Notifications():
 			logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
@@ -1030,7 +1027,7 @@
 
 //// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 //// for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
 	var pConsumers []sarama.PartitionConsumer
 	var err error
 
@@ -1046,10 +1043,10 @@
 
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ic.InterContainerMessage)
+	consumerListeningChannel := make(chan proto.Message)
 	cc := &consumerChannels{
 		consumers: consumersIf,
-		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan proto.Message{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -1069,7 +1066,7 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
@@ -1079,10 +1076,10 @@
 	}
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ic.InterContainerMessage)
+	consumerListeningChannel := make(chan proto.Message)
 	cc := &consumerChannels{
 		consumers: []interface{}{pConsumer},
-		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan proto.Message{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -1120,9 +1117,9 @@
 	return pConsumers, nil
 }
 
-func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
 	var i int
-	var channel chan *ic.InterContainerMessage
+	var channel chan proto.Message
 	for i, channel = range channels {
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/utils.go
new file mode 100644
index 0000000..608361b
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/utils.go
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"errors"
+	"strings"
+	"time"
+
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+)
+
+const (
+	TopicSeparator = "_"
+	DeviceIdLength = 24
+)
+
+// A Topic definition - may be augmented with additional attributes eventually
+type Topic struct {
+	// The name of the topic. It must start with a letter,
+	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
+	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
+	// signs (`%`).
+	Name string
+}
+
+type KVArg struct {
+	Key   string
+	Value interface{}
+}
+
+type RpcMType int
+
+const (
+	RpcFormattingError RpcMType = iota
+	RpcSent
+	RpcReply
+	RpcTimeout
+	RpcTransportError
+	RpcSystemClosing
+)
+
+type RpcResponse struct {
+	MType RpcMType
+	Err   error
+	Reply *any.Any
+}
+
+func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+	return &RpcResponse{
+		MType: messageType,
+		Err:   err,
+		Reply: body,
+	}
+}
+
+// TODO:  Remove and provide better may to get the device id
+// GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
+//			<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
+func GetDeviceIdFromTopic(topic Topic) string {
+	pos := strings.LastIndex(topic.Name, TopicSeparator)
+	if pos == -1 {
+		return ""
+	}
+	adjustedPos := pos + len(TopicSeparator)
+	if adjustedPos >= len(topic.Name) {
+		return ""
+	}
+	deviceId := topic.Name[adjustedPos:len(topic.Name)]
+	if len(deviceId) != DeviceIdLength {
+		return ""
+	}
+	return deviceId
+}
+
+// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
+// context times out.
+func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
+	if kClient == nil {
+		return errors.New("kafka-client-is-nil")
+	}
+	for {
+		if err := kClient.Start(ctx); err != nil {
+			probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
+			select {
+			case <-time.After(connectionRetryInterval):
+				continue
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+		}
+		probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+		logger.Info(ctx, "kafka-connection-up")
+		break
+	}
+	return nil
+}
+
+/**
+MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func MonitorKafkaReadiness(ctx context.Context,
+	kClient Client,
+	liveProbeInterval, notLiveProbeInterval time.Duration,
+	serviceName string) {
+
+	if kClient == nil {
+		logger.Fatal(ctx, "kafka-client-is-nil")
+	}
+
+	logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
+
+	livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
+	healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
+	timeout := liveProbeInterval
+	failed := false
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+
+		select {
+		case healthiness := <-healthinessChannel:
+			if !healthiness {
+				// This will eventually cause K8s to restart the container, and will do
+				// so in a way that allows cleanup to continue, rather than an immediate
+				// panic and exit here.
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
+				logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
+				failed = true
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case liveliness := <-livelinessChannel:
+			if failed {
+				// Failures of the message bus are permanent and can't ever be recovered from,
+				// so make sure we never inadvertently reset a failed state back to unready.
+			} else if !liveliness {
+				// kafka not reachable or down, updating the status to not ready state
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+				logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
+				timeout = notLiveProbeInterval
+			} else {
+				// kafka is reachable , updating the status to running state
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				timeout = liveProbeInterval
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			go func() {
+				err := kClient.SendLiveness(ctx)
+				if err != nil {
+					// Catch possible error case if sending liveness after Sarama has been stopped.
+					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
+				}
+			}()
+		case <-ctx.Done():
+			return // just exit
+		}
+	}
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/log/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/log/common.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/log/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/log/common.go
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/log/log.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/log/log.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/log/log.go
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/log/utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/log/utils.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/log/utils.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/log/utils.go
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/common.go
index a874bd1..c893312 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/common.go
@@ -16,7 +16,7 @@
 package etcd
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/etcd_server.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/etcd_server.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/etcd_server.go
index 6113b3a..951baa9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd/etcd_server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd/etcd_server.go
@@ -18,10 +18,12 @@
 import (
 	"context"
 	"fmt"
-	"go.etcd.io/etcd/embed"
 	"net/url"
 	"os"
+	"strings"
 	"time"
+
+	"go.etcd.io/etcd/embed"
 )
 
 const (
@@ -56,11 +58,15 @@
 	cfg := embed.NewConfig()
 	cfg.Name = configName
 	cfg.Dir = localPersistentStorageDir
-	cfg.Logger = "zap"
+	// cfg.Logger = "zap"
 	if !islogLevelValid(logLevel) {
 		logger.Fatalf(ctx, "Invalid log level -%s", logLevel)
 	}
-	cfg.LogLevel = logLevel
+	// cfg.LogLevel = logLevel
+	cfg.Debug = strings.EqualFold(logLevel, "debug")
+	cfg.LogPkgLevels = "*=C"
+	cfg.SetupLogging()
+
 	acurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", clientPort))
 	if err != nil {
 		logger.Fatalf(ctx, "Invalid client port -%d", clientPort)
@@ -84,9 +90,10 @@
 //getDefaultCfg specifies the default config
 func getDefaultCfg() *embed.Config {
 	cfg := embed.NewConfig()
+	cfg.Debug = false
+	cfg.LogPkgLevels = "*=C"
+	cfg.SetupLogging()
 	cfg.Dir = defaultLocalPersistentStorage
-	cfg.Logger = "zap"
-	cfg.LogLevel = "error"
 	return cfg
 }
 
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/common.go
index 313ff9e..ff4aec9 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/common.go
@@ -16,7 +16,7 @@
 package kafka
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
similarity index 68%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_client.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
index 92966a7..ea410ac 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka/kafka_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
@@ -18,30 +18,34 @@
 import (
 	"context"
 	"fmt"
-	"github.com/golang/protobuf/ptypes"
 	"sync"
 	"time"
 
 	"github.com/golang/protobuf/proto"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
+const (
+	maxConcurrentMessage = 100
+)
+
 // static check to ensure KafkaClient implements kafka.Client
 var _ kafka.Client = &KafkaClient{}
 
 type KafkaClient struct {
-	topicsChannelMap map[string][]chan *ic.InterContainerMessage
+	topicsChannelMap map[string][]chan proto.Message
 	lock             sync.RWMutex
+	alive            bool
+	livenessMutex    sync.Mutex
+	liveness         chan bool
 }
 
 func NewKafkaClient() *KafkaClient {
 	return &KafkaClient{
-		topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
+		topicsChannelMap: make(map[string][]chan proto.Message),
 		lock:             sync.RWMutex{},
 	}
 }
@@ -70,7 +74,7 @@
 	if _, ok := kc.topicsChannelMap[topic.Name]; ok {
 		return fmt.Errorf("Topic %s already exist", topic.Name)
 	}
-	ch := make(chan *ic.InterContainerMessage)
+	ch := make(chan proto.Message)
 	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
 	return nil
 }
@@ -83,21 +87,21 @@
 	return nil
 }
 
-func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan proto.Message, error) {
 	logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
-	ch := make(chan *ic.InterContainerMessage)
+	ch := make(chan proto.Message, maxConcurrentMessage)
 	kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
 	return ch, nil
 }
 
-func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
+func removeChannel(s []chan proto.Message, i int) []chan proto.Message {
 	s[i] = s[len(s)-1]
 	return s[:len(s)-1]
 }
 
-func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan proto.Message) error {
 	logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
@@ -120,55 +124,50 @@
 	logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
 }
 
-func toIntercontainerMessage(event *voltha.Event) *ic.InterContainerMessage {
-	msg := &ic.InterContainerMessage{
-		Header: &ic.Header{
-			Id:        event.Header.Id,
-			Type:      ic.MessageType_REQUEST,
-			Timestamp: event.Header.RaisedTs,
-		},
-	}
-	// Marshal event
-	if eventBody, err := ptypes.MarshalAny(event); err == nil {
-		msg.Body = eventBody
-	}
-	return msg
-}
-
 func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
 	// Assert message is a proto message
-	// ascertain the value interface type is a proto.Message
-	if _, ok := msg.(proto.Message); !ok {
+	protoMsg, ok := msg.(proto.Message)
+	if !ok {
 		logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
 		return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
 	}
-	req, ok := msg.(*ic.InterContainerMessage)
-	if !ok {
-		event, ok := msg.(*voltha.Event) //This is required as event message will be of type voltha.Event
-		if !ok {
-			return status.Error(codes.InvalidArgument, "unexpected-message-type")
-		}
-		req = toIntercontainerMessage(event)
-	}
-	if req == nil {
-		return status.Error(codes.InvalidArgument, "msg-nil")
-	}
 	kc.lock.RLock()
 	defer kc.lock.RUnlock()
 	for _, ch := range kc.topicsChannelMap[topic.Name] {
-		logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
-		ch <- req
+		select {
+		case ch <- protoMsg:
+			logger.Debugw(ctx, "publishing", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+		default:
+			logger.Debugw(ctx, "ignoring-event-channel-busy", log.Fields{"toTopic": topic.Name, "msg": protoMsg})
+		}
 	}
 	return nil
 }
 
 func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
-	return status.Error(codes.Unimplemented, "SendLiveness")
+	kc.livenessMutex.Lock()
+	defer kc.livenessMutex.Unlock()
+	if kc.liveness != nil {
+		kc.liveness <- true // I am a mock
+	}
+	return nil
 }
 
 func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
-	logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
-	return nil
+	logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
+	if enable {
+		kc.livenessMutex.Lock()
+		defer kc.livenessMutex.Unlock()
+		if kc.liveness == nil {
+			logger.Info(ctx, "kafka-create-liveness-channel")
+			kc.liveness = make(chan bool, 10)
+			// post intial state to the channel
+			kc.liveness <- kc.alive
+		}
+	} else {
+		panic("Turning off liveness reporting is not supported")
+	}
+	return kc.liveness
 }
 
 func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/probe/common.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/probe/common.go
similarity index 94%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/probe/common.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/probe/common.go
index 119d78e..6508fd4 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/probe/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/probe/common.go
@@ -16,7 +16,7 @@
 package probe
 
 import (
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 var logger log.CLogger
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/probe/probe.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/probe/probe.go
similarity index 99%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/probe/probe.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/probe/probe.go
index b66f398..84a2d5f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/probe/probe.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/probe/probe.go
@@ -18,9 +18,10 @@
 import (
 	"context"
 	"fmt"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
 	"net/http"
 	"sync"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 )
 
 // ProbeContextKey used to fetch the Probe instance from a context
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/version/version.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/version/version.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v5/pkg/version/version.go
rename to vendor/github.com/opencord/voltha-lib-go/v7/pkg/version/version.go