VOL-3616 Support for API to retrieve information (example UNI) from an ONU
This commit implements extension service, which includes handling of the GetExtValue and SetExtValue APIs

Change-Id: I537160af5b70eccef77a8bc11235af3b50f9a513
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index a9767d4..b6f2b2f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -38,6 +38,7 @@
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/extension"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
@@ -966,3 +967,71 @@
 	logger.Debug(ctx, "setExtValue-Success-device-agent")
 	return &empty.Empty{}, nil
 }
+
+func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
+	logger.Debugw(ctx, "getSingleValue", log.Fields{"device-id": request.TargetId})
+
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	//send request to adapter
+	ch, err := agent.adapterProxy.GetSingleValue(ctx, cloned.Adapter, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	// Wait for the adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+	}
+
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+
+	resp := &extension.SingleGetValueResponse{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
+
+	return resp, nil
+}
+
+func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
+	logger.Debugw(ctx, "setSingleValue", log.Fields{"device-id": request.TargetId})
+
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	//send request to adapter
+	ch, err := agent.adapterProxy.SetSingleValue(ctx, cloned.Adapter, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	// Wait for the adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed-cloned-id-%s", agent.deviceID)
+	}
+
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+
+	resp := &extension.SingleSetValueResponse{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
+
+	return resp, nil
+}
diff --git a/rw_core/core/device/extension_manager.go b/rw_core/core/device/extension_manager.go
new file mode 100644
index 0000000..1e44ddf
--- /dev/null
+++ b/rw_core/core/device/extension_manager.go
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	"context"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/extension"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+type ExtensionManager struct {
+	DeviceManager *Manager
+}
+
+func GetNewExtensionManager(deviceManager *Manager) *ExtensionManager {
+	return &ExtensionManager{DeviceManager: deviceManager}
+}
+
+func (e ExtensionManager) GetExtValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
+	log.EnrichSpan(ctx, log.Fields{"device-id": request.TargetId})
+
+	logger.Debugw(ctx, "GetExtValue", log.Fields{"request": request})
+	agent := e.DeviceManager.getDeviceAgent(ctx, request.TargetId)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "target-id %s", request.TargetId)
+	}
+
+	response, err := agent.getSingleValue(ctx, request)
+	if err != nil {
+		logger.Errorw(ctx, "Fail-to-get-single-value", log.Fields{"device-id": agent.deviceID, "error": err})
+		return nil, err
+	}
+
+	logger.Debugw(ctx, "GetExtValue response", log.Fields{"response": response})
+	return response, nil
+}
+
+func (e ExtensionManager) SetExtValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
+	log.EnrichSpan(ctx, log.Fields{"device-id": request.TargetId})
+
+	logger.Debugw(ctx, "SetExtValue", log.Fields{"request": request})
+	agent := e.DeviceManager.getDeviceAgent(ctx, request.TargetId)
+	if agent == nil {
+		return nil, status.Errorf(codes.NotFound, "target-id %s", request.TargetId)
+	}
+
+	response, err := agent.setSingleValue(ctx, request)
+	if err != nil {
+		logger.Errorw(ctx, "Fail-to-set-single-value", log.Fields{"device-id": agent.deviceID, "error": err})
+		return nil, err
+	}
+
+	logger.Debugw(ctx, "SetExtValue response", log.Fields{"response": response})
+	return response, nil
+}
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index fd9a1a0..66e7672 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -21,6 +21,7 @@
 
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/extension"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
@@ -474,3 +475,47 @@
 	replyToTopic := ap.getCoreTopic()
 	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, value.Id, args...)
 }
+
+// GetSingleValue get a value from the adapter, based on the request type
+func (ap *AdapterProxy) GetSingleValue(ctx context.Context, adapterType string, request *extension.SingleGetValueRequest) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "GetSingleValue", log.Fields{"device-id": request.TargetId})
+	rpc := "single_get_value_request"
+	toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
+	if err != nil {
+		return nil, err
+	}
+
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	args := []*kafka.KVArg{
+		{
+			Key:   "request",
+			Value: request,
+		},
+	}
+
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
+}
+
+// SetSingleValue set a single value on the adapter, based on the request type
+func (ap *AdapterProxy) SetSingleValue(ctx context.Context, adapterType string, request *extension.SingleSetValueRequest) (chan *kafka.RpcResponse, error) {
+	logger.Debugw(ctx, "SetSingleValue", log.Fields{"device-id": request.TargetId})
+	rpc := "single_set_value_request"
+	toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
+	if err != nil {
+		return nil, err
+	}
+
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	args := []*kafka.KVArg{
+		{
+			Key:   "request",
+			Value: request,
+		},
+	}
+
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
+}