[VOL-2833] Reporting total number of instances and current adapter instance during Adapter Registration (needs VOL-2834)
Change-Id: Ic7b0c4f52677383feb98d42ef7c68593fb30f0e3
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 9582f33..20e1a52 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -99,6 +99,28 @@
topic := kafka.Topic{Name: ap.coreTopic}
replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 2)
+
+ if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+ log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+ log.Fatal("currentReplica can't be 0, it has to start from 1")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+ // if the adapter is not setting these fields they default to 0,
+ // in that case it means the adapter is not ready to be scaled and thus it defaults
+ // to a single instance
+ adapter.CurrentReplica = 1
+ adapter.TotalReplicas = 1
+ }
+
+ if adapter.CurrentReplica > adapter.TotalReplicas {
+ log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ adapter.CurrentReplica, adapter.TotalReplicas)
+ }
+
args[0] = &kafka.KVArg{
Key: "adapter",
Value: adapter,
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 149ab2e..ee9c5b0 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -37,6 +37,88 @@
}
+func TestCoreProxy_RegisterAdapter_default(t *testing.T) {
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.Device{Id: "testDevice"},
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ adapter := &voltha.Adapter{
+ Id: "testAdapter",
+ Vendor: "ONF",
+ Version: "1.0.0",
+ }
+ types := []*voltha.DeviceType{{
+ Id: "testolt",
+ Adapter: "testAdapter",
+ AcceptsBulkFlowUpdate: true,
+ }}
+ devices := &voltha.DeviceTypes{Items: types}
+
+ err := proxy.RegisterAdapter(context.TODO(), adapter, devices)
+
+ assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+ assert.Equal(t, nil, err)
+
+ call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+ assert.Equal(t, call.Rpc, "Register")
+ assert.Equal(t, call.ToTopic, &kafka.Topic{Name: "testCoreTopic"})
+ assert.Equal(t, call.ReplyToTopic, &kafka.Topic{Name: "testAdapterTopic"})
+ assert.Equal(t, call.WaitForResponse, true)
+ assert.Equal(t, call.Key, "")
+ assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "adapter", Value: &voltha.Adapter{
+ Id: adapter.Id,
+ Vendor: adapter.Vendor,
+ Version: adapter.Version,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ }})
+ assert.Equal(t, call.KvArgs[1], &kafka.KVArg{Key: "deviceTypes", Value: devices})
+}
+
+func TestCoreProxy_RegisterAdapter_multiple(t *testing.T) {
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.Device{Id: "testDevice"},
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ adapter := &voltha.Adapter{
+ Id: "testAdapter",
+ Vendor: "ONF",
+ Version: "1.0.0",
+ CurrentReplica: 4,
+ TotalReplicas: 8,
+ }
+ types := []*voltha.DeviceType{{
+ Id: "testolt",
+ Adapter: "testAdapter",
+ AcceptsBulkFlowUpdate: true,
+ }}
+ devices := &voltha.DeviceTypes{Items: types}
+
+ err := proxy.RegisterAdapter(context.TODO(), adapter, devices)
+
+ assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+ assert.Equal(t, nil, err)
+
+ call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+ assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "adapter", Value: &voltha.Adapter{
+ Id: adapter.Id,
+ Vendor: adapter.Vendor,
+ Version: adapter.Version,
+ CurrentReplica: 4,
+ TotalReplicas: 8,
+ }})
+}
+
func TestCoreProxy_GetChildDevice_sn(t *testing.T) {
var mockKafkaIcProxy = mocks.MockKafkaICProxy{