VOL-3734 adding rpc events to the queue and sending to kafka from queue
Change-Id: I7a220fd3c7af0312ad20d4a15c0162b5c77f2044
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
index 268c571..97bb135 100644
--- a/pkg/mocks/kafka/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -18,12 +18,15 @@
import (
"context"
"fmt"
+ "github.com/golang/protobuf/ptypes"
"sync"
"time"
+ "github.com/golang/protobuf/proto"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -117,10 +120,35 @@
logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
}
+func toIntercontainerMessage(event *voltha.Event) *ic.InterContainerMessage {
+ msg := &ic.InterContainerMessage{
+ Header: &ic.Header{
+ Id: event.Header.Id,
+ Type: ic.MessageType_REQUEST,
+ Timestamp: event.Header.RaisedTs,
+ },
+ }
+ // Marshal event
+ if eventBody, err := ptypes.MarshalAny(event); err == nil {
+ msg.Body = eventBody
+ }
+ return msg
+}
+
func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
+ // Assert message is a proto message
+ // ascertain the value interface type is a proto.Message
+ if _, ok := msg.(proto.Message); !ok {
+ logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
+ return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
+ }
req, ok := msg.(*ic.InterContainerMessage)
if !ok {
- return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
+ event, ok := msg.(*voltha.Event) //This is required as event message will be of type voltha.Event
+ if !ok {
+ return status.Error(codes.InvalidArgument, "unexpected-message-type")
+ }
+ req = toIntercontainerMessage(event)
}
if req == nil {
return status.Error(codes.InvalidArgument, "msg-nil")