VOL-3244 - remove device discovery topic

this was only used by affinity routing in conjunction
with compete model. it was removed as not being used
by anything else.

Change-Id: Ie7c611c0bc8c301ce3c01d434a06fc9fe73a5d32
diff --git a/VERSION b/VERSION
index 589ccc9..5c5bdc2 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.1.20
+3.1.21
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 9f9fbfc..cbde834 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -19,13 +19,14 @@
 	"context"
 	"errors"
 	"fmt"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 	"reflect"
 	"strings"
 	"sync"
 	"time"
 
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -66,7 +67,6 @@
 	Start() error
 	Stop()
 	GetDefaultTopic() *Topic
-	DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
 	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
 	InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
 	SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
@@ -82,7 +82,6 @@
 	kafkaAddress                   string
 	defaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
-	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
 	doneCh                         chan struct{}
 	doneOnce                       sync.Once
@@ -118,12 +117,6 @@
 	}
 }
 
-func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
-	return func(args *interContainerProxy) {
-		args.deviceDiscoveryTopic = topic
-	}
-}
-
 func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
 	return func(args *interContainerProxy) {
 		args.defaultRequestHandlerInterface = handler
@@ -199,48 +192,6 @@
 	return kp.defaultTopic
 }
 
-// DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
-	logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
-	//	Simple validation
-	if deviceId == "" || deviceType == "" {
-		logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
-		return errors.New("invalid-parameters")
-	}
-	//	Create the device discovery message
-	header := &ic.Header{
-		Id:        uuid.New().String(),
-		Type:      ic.MessageType_DEVICE_DISCOVERED,
-		FromTopic: kp.defaultTopic.Name,
-		ToTopic:   kp.deviceDiscoveryTopic.Name,
-		Timestamp: ptypes.TimestampNow(),
-	}
-	body := &ic.DeviceDiscovered{
-		Id:         deviceId,
-		DeviceType: deviceType,
-		ParentId:   parentId,
-		Publisher:  publisher,
-	}
-
-	var marshalledData *any.Any
-	var err error
-	if marshalledData, err = ptypes.MarshalAny(body); err != nil {
-		logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
-		return err
-	}
-	msg := &ic.InterContainerMessage{
-		Header: header,
-		Body:   marshalledData,
-	}
-
-	// Send the message
-	if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
-		logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
-		return err
-	}
-	return nil
-}
-
 // InvokeAsyncRPC is used to make an RPC request asynchronously
 func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
diff --git a/pkg/mocks/kafka/kafka_inter_container_proxy.go b/pkg/mocks/kafka/kafka_inter_container_proxy.go
index 34aec95..bf8582d 100644
--- a/pkg/mocks/kafka/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -61,9 +62,7 @@
 	return &t
 }
 func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
-func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
-	return nil
-}
+
 func (s *MockKafkaICProxy) Stop() {}
 
 func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,