[VOL-3317]Replace the toAdapter in InterAdapterHeader with the output from getEndPoint
Change-Id: I22808d5e2d0e66212b6cb65342a9db2a3464cb97
diff --git a/VERSION b/VERSION
index b347b11..351227f 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.2.3
+3.2.4
diff --git a/pkg/adapters/common/adapter_proxy.go b/pkg/adapters/common/adapter_proxy.go
index ca44d0d..cb5d1bb 100644
--- a/pkg/adapters/common/adapter_proxy.go
+++ b/pkg/adapters/common/adapter_proxy.go
@@ -30,19 +30,17 @@
type AdapterProxy struct {
kafkaICProxy kafka.InterContainerProxy
- adapterTopic string
coreTopic string
endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
proxy := AdapterProxy{
kafkaICProxy: kafkaProxy,
- adapterTopic: adapterTopic,
coreTopic: coreTopic,
endpointMgr: kafka.NewEndpointManager(backend),
}
- logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+ logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
return &proxy
}
@@ -65,11 +63,17 @@
return err
}
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+
//Build the inter adapter message
header := &ic.InterAdapterHeader{
Type: msgType,
FromTopic: fromAdapter,
- ToTopic: toAdapter,
+ ToTopic: string(endpoint),
ToDeviceId: toDeviceId,
ProxyDeviceId: proxyDeviceId,
}
@@ -89,11 +93,6 @@
Value: iaMsg,
}
- // Set up the required rpc arguments
- endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
- if err != nil {
- return err
- }
topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
diff --git a/pkg/adapters/common/adapter_proxy_test.go b/pkg/adapters/common/adapter_proxy_test.go
index 01a6603..d86a895 100644
--- a/pkg/adapters/common/adapter_proxy_test.go
+++ b/pkg/adapters/common/adapter_proxy_test.go
@@ -56,7 +56,7 @@
},
}
backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
assert.NotNil(t, adapter)
}
@@ -71,7 +71,7 @@
}
backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
adapter.endpointMgr = mocks.NewEndpointManager()
@@ -115,7 +115,7 @@
}
backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
adapter.endpointMgr = mocks.NewEndpointManager()
@@ -143,7 +143,7 @@
}
backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testCoreTopic", backend)
adapter.endpointMgr = mocks.NewEndpointManager()