[VOL-2833] Reporting total number of instances and current adapter instance during Adapter Registration (needs VOL-2834)

Change-Id: Ic7b0c4f52677383feb98d42ef7c68593fb30f0e3
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 9582f33..20e1a52 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -99,6 +99,28 @@
 	topic := kafka.Topic{Name: ap.coreTopic}
 	replyToTopic := ap.getAdapterTopic()
 	args := make([]*kafka.KVArg, 2)
+
+	if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+		log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+	}
+
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+		log.Fatal("currentReplica can't be 0, it has to start from 1")
+	}
+
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+		// if the adapter is not setting these fields they default to 0,
+		// in that case it means the adapter is not ready to be scaled and thus it defaults
+		// to a single instance
+		adapter.CurrentReplica = 1
+		adapter.TotalReplicas = 1
+	}
+
+	if adapter.CurrentReplica > adapter.TotalReplicas {
+		log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+			adapter.CurrentReplica, adapter.TotalReplicas)
+	}
+
 	args[0] = &kafka.KVArg{
 		Key:   "adapter",
 		Value: adapter,
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 149ab2e..ee9c5b0 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -37,6 +37,88 @@
 
 }
 
+func TestCoreProxy_RegisterAdapter_default(t *testing.T) {
+	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+		InvokeRpcSpy: mocks.InvokeRpcSpy{
+			Calls:    make(map[int]mocks.InvokeRpcArgs),
+			Response: &voltha.Device{Id: "testDevice"},
+		},
+	}
+
+	proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+	adapter := &voltha.Adapter{
+		Id:      "testAdapter",
+		Vendor:  "ONF",
+		Version: "1.0.0",
+	}
+	types := []*voltha.DeviceType{{
+		Id:                    "testolt",
+		Adapter:               "testAdapter",
+		AcceptsBulkFlowUpdate: true,
+	}}
+	devices := &voltha.DeviceTypes{Items: types}
+
+	err := proxy.RegisterAdapter(context.TODO(), adapter, devices)
+
+	assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+	assert.Equal(t, nil, err)
+
+	call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+	assert.Equal(t, call.Rpc, "Register")
+	assert.Equal(t, call.ToTopic, &kafka.Topic{Name: "testCoreTopic"})
+	assert.Equal(t, call.ReplyToTopic, &kafka.Topic{Name: "testAdapterTopic"})
+	assert.Equal(t, call.WaitForResponse, true)
+	assert.Equal(t, call.Key, "")
+	assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "adapter", Value: &voltha.Adapter{
+		Id:             adapter.Id,
+		Vendor:         adapter.Vendor,
+		Version:        adapter.Version,
+		CurrentReplica: 1,
+		TotalReplicas:  1,
+	}})
+	assert.Equal(t, call.KvArgs[1], &kafka.KVArg{Key: "deviceTypes", Value: devices})
+}
+
+func TestCoreProxy_RegisterAdapter_multiple(t *testing.T) {
+	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+		InvokeRpcSpy: mocks.InvokeRpcSpy{
+			Calls:    make(map[int]mocks.InvokeRpcArgs),
+			Response: &voltha.Device{Id: "testDevice"},
+		},
+	}
+
+	proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+	adapter := &voltha.Adapter{
+		Id:             "testAdapter",
+		Vendor:         "ONF",
+		Version:        "1.0.0",
+		CurrentReplica: 4,
+		TotalReplicas:  8,
+	}
+	types := []*voltha.DeviceType{{
+		Id:                    "testolt",
+		Adapter:               "testAdapter",
+		AcceptsBulkFlowUpdate: true,
+	}}
+	devices := &voltha.DeviceTypes{Items: types}
+
+	err := proxy.RegisterAdapter(context.TODO(), adapter, devices)
+
+	assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+	assert.Equal(t, nil, err)
+
+	call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+	assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "adapter", Value: &voltha.Adapter{
+		Id:             adapter.Id,
+		Vendor:         adapter.Vendor,
+		Version:        adapter.Version,
+		CurrentReplica: 4,
+		TotalReplicas:  8,
+	}})
+}
+
 func TestCoreProxy_GetChildDevice_sn(t *testing.T) {
 
 	var mockKafkaIcProxy = mocks.MockKafkaICProxy{