[VOL-2163] Supporting Async request
Introduces InvokeAsyncRPC to support aynchronous requests
Change-Id: Ica947a30140605d46518aa6c73f6661c0645ce92
diff --git a/VERSION b/VERSION
index 25875f0..3e4a61b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.19
+3.0.20
diff --git a/pkg/adapters/adapterif/core_proxy_if.go b/pkg/adapters/adapterif/core_proxy_if.go
index dbf3418..9636a7d 100644
--- a/pkg/adapters/adapterif/core_proxy_if.go
+++ b/pkg/adapters/adapterif/core_proxy_if.go
@@ -18,7 +18,6 @@
import (
"context"
-
"github.com/opencord/voltha-protos/v3/go/voltha"
)
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 1014ada..d38f0f6 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -29,13 +29,13 @@
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
- ectdAPI *v3Client.Client
- keyReservations map[string]*v3Client.LeaseID
- watchedChannels sync.Map
- writeLock sync.Mutex
- lockToMutexMap map[string]*v3Concurrency.Mutex
- lockToSessionMap map[string]*v3Concurrency.Session
- lockToMutexLock sync.Mutex
+ ectdAPI *v3Client.Client
+ keyReservations map[string]*v3Client.LeaseID
+ watchedChannels sync.Map
+ keyReservationsLock sync.RWMutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
+ lockToSessionMap map[string]*v3Concurrency.Session
+ lockToMutexLock sync.Mutex
}
// NewEtcdClient returns a new client for the Etcd KV store
@@ -114,13 +114,13 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
-
var err error
// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
// that KV key permanent instead of automatically removing it after a lease expiration
- if leaseID, ok := c.keyReservations[key]; ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok := c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+ if ok {
_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
} else {
_, err = c.ectdAPI.Put(ctx, key, val)
@@ -146,9 +146,6 @@
// wait for a response
func (c *EtcdClient) Delete(ctx context.Context, key string) error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
-
// delete the key
if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
logger.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
@@ -177,9 +174,9 @@
return nil, err
}
// Register the lease id
- c.writeLock.Lock()
+ c.keyReservationsLock.Lock()
c.keyReservations[key] = &resp.ID
- c.writeLock.Unlock()
+ c.keyReservationsLock.Unlock()
// Revoke lease if reservation is not successful
reservationSuccessful := false
@@ -235,8 +232,8 @@
// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
for key, leaseID := range c.keyReservations {
_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -255,8 +252,8 @@
logger.Debugw("Release-reservation", log.Fields{"key": key})
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
if leaseID, ok = c.keyReservations[key]; !ok {
return nil
}
@@ -278,9 +275,11 @@
// Get the leaseid using the key
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- if leaseID, ok = c.keyReservations[key]; !ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok = c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+
+ if !ok {
return errors.New("key-not-reserved")
}
@@ -372,8 +371,6 @@
// Get the array of channels mapping
var watchedChannels []map[chan *Event]v3Client.Watcher
var ok bool
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if watchedChannels, ok = c.getChannelMaps(key); !ok {
logger.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
@@ -425,8 +422,6 @@
// Close closes the KV store client
func (c *EtcdClient) Close() {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if err := c.ectdAPI.Close(); err != nil {
logger.Errorw("error-closing-client", log.Fields{"error": err})
}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index c7dc5af..5dbde9c 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -19,6 +19,8 @@
"context"
"errors"
"fmt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"reflect"
"strings"
"sync"
@@ -34,7 +36,7 @@
const (
DefaultMaxRetries = 3
- DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
+ DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
)
const (
@@ -66,6 +68,7 @@
GetDefaultTopic() *Topic
DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+ InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
UnSubscribeFromRequestHandler(topic Topic) error
@@ -246,6 +249,104 @@
return nil
}
+// InvokeAsyncRPC is used to make an RPC request asynchronously
+func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+ waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
+
+ logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+ // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
+ // typically the device ID.
+ responseTopic := replyToTopic
+ if responseTopic == nil {
+ responseTopic = kp.GetDefaultTopic()
+ }
+
+ chnl := make(chan *RpcResponse)
+
+ go func() {
+
+ // once we're done,
+ // close the response channel
+ defer close(chnl)
+
+ var err error
+ var protoRequest *ic.InterContainerMessage
+
+ // Encode the request
+ protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+ if err != nil {
+ logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ chnl <- NewResponse(RpcFormattingError, err, nil)
+ return
+ }
+
+ // Subscribe for response, if needed, before sending request
+ var ch <-chan *ic.InterContainerMessage
+ if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
+ logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // Send request - if the topic is formatted with a device Id then we will send the request using a
+ // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
+ // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
+ logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+
+ // if the message is not sent on kafka publish an event an close the channel
+ if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // if the client is not waiting for a response send the ack and close the channel
+ chnl <- NewResponse(RpcSent, nil, nil)
+ if !waitForResponse {
+ return
+ }
+
+ defer func() {
+ // Remove the subscription for a response on return
+ if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
+ logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
+ }
+ }()
+
+ // Wait for response as well as timeout or cancellation
+ select {
+ case msg, ok := <-ch:
+ if !ok {
+ logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
+ }
+ logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+ if responseBody, err := decodeResponse(msg); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ if responseBody.Success {
+ chnl <- NewResponse(RpcReply, nil, responseBody.Result)
+ } else {
+ // response body contains an error
+ unpackErr := &ic.Error{}
+ if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
+ }
+ }
+ }
+ case <-ctx.Done():
+ logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
+ chnl <- NewResponse(RpcTimeout, err, nil)
+ case <-kp.doneCh:
+ chnl <- NewResponse(RpcSystemClosing, nil, nil)
+ logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+ }
+ }()
+ return chnl
+}
+
// InvokeRPC is used to send a request to a given topic
func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
diff --git a/pkg/kafka/utils.go b/pkg/kafka/utils.go
index 0cb9535..bdc615f 100644
--- a/pkg/kafka/utils.go
+++ b/pkg/kafka/utils.go
@@ -15,7 +15,10 @@
*/
package kafka
-import "strings"
+import (
+ "github.com/golang/protobuf/ptypes/any"
+ "strings"
+)
const (
TopicSeparator = "_"
@@ -36,6 +39,31 @@
Value interface{}
}
+type RpcMType int
+
+const (
+ RpcFormattingError RpcMType = iota
+ RpcSent
+ RpcReply
+ RpcTimeout
+ RpcTransportError
+ RpcSystemClosing
+)
+
+type RpcResponse struct {
+ MType RpcMType
+ Err error
+ Reply *any.Any
+}
+
+func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+ return &RpcResponse{
+ MType: messageType,
+ Err: err,
+ Reply: body,
+ }
+}
+
// TODO: Remove and provide better may to get the device id
// GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
// <any string> or <any string>_<deviceId>. The device Id is 24 characters long.
diff --git a/pkg/mocks/kafka_inter_container_proxy.go b/pkg/mocks/kafka_inter_container_proxy.go
index 405fbe7..9879830 100644
--- a/pkg/mocks/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka_inter_container_proxy.go
@@ -42,6 +42,13 @@
Response proto.Message
}
+type InvokeAsyncRpcSpy struct {
+ CallCount int
+ Calls map[int]InvokeRpcArgs
+ Timeout bool
+ Response proto.Message
+}
+
type MockKafkaICProxy struct {
InvokeRpcSpy InvokeRpcSpy
}
@@ -58,6 +65,29 @@
return nil
}
func (s *MockKafkaICProxy) Stop() {}
+
+func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
+ waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
+
+ args := make(map[int]interface{}, 4)
+ for k, v := range kvArgs {
+ args[k] = v
+ }
+
+ s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+ Rpc: rpc,
+ ToTopic: toTopic,
+ ReplyToTopic: replyToTopic,
+ WaitForResponse: waitForResponse,
+ Key: key,
+ KvArgs: args,
+ }
+
+ chnl := make(chan *kafka.RpcResponse)
+
+ return chnl
+}
+
func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
s.InvokeRpcSpy.CallCount++