VOL-3244 - remove device discovery topic
this was only used by affinity routing in conjunction
with compete model. it was removed as not being used
by anything else.
Change-Id: Ie7c611c0bc8c301ce3c01d434a06fc9fe73a5d32
diff --git a/VERSION b/VERSION
index 589ccc9..5c5bdc2 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.1.20
+3.1.21
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 9f9fbfc..cbde834 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -19,13 +19,14 @@
"context"
"errors"
"fmt"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
"reflect"
"strings"
"sync"
"time"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
@@ -66,7 +67,6 @@
Start() error
Stop()
GetDefaultTopic() *Topic
- DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
@@ -82,7 +82,6 @@
kafkaAddress string
defaultTopic *Topic
defaultRequestHandlerInterface interface{}
- deviceDiscoveryTopic *Topic
kafkaClient Client
doneCh chan struct{}
doneOnce sync.Once
@@ -118,12 +117,6 @@
}
}
-func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
- return func(args *interContainerProxy) {
- args.deviceDiscoveryTopic = topic
- }
-}
-
func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
return func(args *interContainerProxy) {
args.defaultRequestHandlerInterface = handler
@@ -199,48 +192,6 @@
return kp.defaultTopic
}
-// DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
- logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
- // Simple validation
- if deviceId == "" || deviceType == "" {
- logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
- return errors.New("invalid-parameters")
- }
- // Create the device discovery message
- header := &ic.Header{
- Id: uuid.New().String(),
- Type: ic.MessageType_DEVICE_DISCOVERED,
- FromTopic: kp.defaultTopic.Name,
- ToTopic: kp.deviceDiscoveryTopic.Name,
- Timestamp: ptypes.TimestampNow(),
- }
- body := &ic.DeviceDiscovered{
- Id: deviceId,
- DeviceType: deviceType,
- ParentId: parentId,
- Publisher: publisher,
- }
-
- var marshalledData *any.Any
- var err error
- if marshalledData, err = ptypes.MarshalAny(body); err != nil {
- logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
- return err
- }
- msg := &ic.InterContainerMessage{
- Header: header,
- Body: marshalledData,
- }
-
- // Send the message
- if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
- logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
- return err
- }
- return nil
-}
-
// InvokeAsyncRPC is used to make an RPC request asynchronously
func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
diff --git a/pkg/mocks/kafka/kafka_inter_container_proxy.go b/pkg/mocks/kafka/kafka_inter_container_proxy.go
index 34aec95..bf8582d 100644
--- a/pkg/mocks/kafka/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -18,6 +18,7 @@
import (
"context"
+
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
@@ -61,9 +62,7 @@
return &t
}
func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
-func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
- return nil
-}
+
func (s *MockKafkaICProxy) Stop() {}
func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,