VOL-2927 convert to static protos
Change-Id: If08aec0b1fb84fc54f7f62d5e4ede8ad4a9db80f
diff --git a/internal/pkg/commands/message.go b/internal/pkg/commands/message.go
index 0d9e2fb..a2e795c 100644
--- a/internal/pkg/commands/message.go
+++ b/internal/pkg/commands/message.go
@@ -20,14 +20,13 @@
"errors"
"fmt"
"github.com/Shopify/sarama"
- "github.com/fullstorydev/grpcurl"
- "github.com/golang/protobuf/ptypes/any"
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
flags "github.com/jessevdk/go-flags"
- "github.com/jhump/protoreflect/desc"
- "github.com/jhump/protoreflect/dynamic"
"github.com/opencord/voltctl/pkg/filter"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltctl/pkg/model"
+ "github.com/opencord/voltha-protos/v3/go/inter_container"
"log"
"os"
"os/signal"
@@ -134,84 +133,94 @@
ProxyDeviceId: 10,
}
+// jsonpb requires a resolver to resolve Any.Any into proto.Message.
+type VolthaAnyResolver struct{}
+
+func (r *VolthaAnyResolver) Resolve(typeURL string) (proto.Message, error) {
+ // TODO: We should be able to get this automatically via reflection using
+ // the following commented-out code, but it requires upgrading voltctl to
+ // use newer versions of protobuf libraries.
+
+ /*
+ msgType, err := protoregistry.GlobalTypes.FindMessageByURL(typeURL)
+ if err != nil {
+ return err
+ }
+ return msgType.New(), nil*/
+
+ // The intercontianer message bus is where we need to map from Any.Any
+ // to the appropriate protos when generating json output.
+
+ typeURL = strings.TrimPrefix(typeURL, "type.googleapis.com/")
+
+ switch typeURL {
+ case "voltha.StrType":
+ return &inter_container.StrType{}, nil
+ case "voltha.IntType":
+ return &inter_container.IntType{}, nil
+ case "voltha.BoolType":
+ return &inter_container.BoolType{}, nil
+ case "voltha.Packet":
+ return &inter_container.Packet{}, nil
+ case "voltha.ErrorCode":
+ return &inter_container.ErrorCode{}, nil
+ case "voltha.Error":
+ return &inter_container.Error{}, nil
+ case "voltha.Header":
+ return &inter_container.Header{}, nil
+ case "voltha.Argument":
+ return &inter_container.Argument{}, nil
+ case "voltha.InterContainerMessage":
+ return &inter_container.InterContainerMessage{}, nil
+ case "voltha.InterContainerRequestBody":
+ return &inter_container.InterContainerRequestBody{}, nil
+ case "voltha.InterContainerResponseBody":
+ return &inter_container.InterContainerResponseBody{}, nil
+ case "voltha.SwitchCapability":
+ return &inter_container.SwitchCapability{}, nil
+ case "voltha.PortCapability":
+ return &inter_container.PortCapability{}, nil
+ case "voltha.DeviceDiscovered":
+ return &inter_container.DeviceDiscovered{}, nil
+ case "voltha.InterAdapterMessageType":
+ return &inter_container.InterAdapterMessageType{}, nil
+ case "voltha.InterAdapterOmciMessage":
+ return &inter_container.InterAdapterOmciMessage{}, nil
+ case "voltha.InterAdapterTechProfileDownloadMessage":
+ return &inter_container.InterAdapterTechProfileDownloadMessage{}, nil
+ case "voltha.InterAdapterDeleteGemPortMessage":
+ return &inter_container.InterAdapterDeleteGemPortMessage{}, nil
+ case "voltha.InterAdapterDeleteTcontMessage":
+ return &inter_container.InterAdapterDeleteTcontMessage{}, nil
+ case "voltha.InterAdapterResponseBody":
+ return &inter_container.InterAdapterResponseBody{}, nil
+ case "voltha.InterAdapterMessage":
+ return &inter_container.InterAdapterMessage{}, nil
+ }
+
+ return nil, fmt.Errorf("Unknown any type: %s", typeURL)
+}
+
func RegisterMessageCommands(parent *flags.Parser) {
if _, err := parent.AddCommand("message", "message commands", "Commands for observing messages between components", &interAdapterOpts); err != nil {
Error.Fatalf("Unable to register message commands with voltctl command parser: %s", err.Error())
}
}
-// Find the any.Any field named by "fieldName" in the dynamic Message m.
-// Create a new dynamic message using the bytes from the Any
-// Return the new dynamic message and the type name
-func DeserializeAny(icFile *desc.FileDescriptor, m *dynamic.Message, fieldName string) (*dynamic.Message, string, error) {
- f, err := m.TryGetFieldByName(fieldName)
- if err != nil {
- return nil, "", err
- }
- a := f.(*any.Any)
- embeddedType := strings.SplitN(a.TypeUrl, "/", 2)[1] // example type.googleapis.com/voltha.InterContainerRequestBody
- embeddedBytes := a.Value
-
- md := icFile.FindMessage(embeddedType)
-
- embeddedM := dynamic.NewMessage(md)
- err = embeddedM.Unmarshal(embeddedBytes)
- if err != nil {
- return nil, "", err
- }
-
- return embeddedM, embeddedType, nil
-}
-
// Extract the header, as well as a few other items that might be of interest
-func DecodeInterContainerHeader(icFile *desc.FileDescriptor, md *desc.MessageDescriptor, b []byte, ts time.Time, f grpcurl.Formatter) (*MessageHeader, error) {
- m := dynamic.NewMessage(md)
- if err := m.Unmarshal(b); err != nil {
+func DecodeInterContainerHeader(b []byte, ts time.Time) (*MessageHeader, error) {
+ m := &inter_container.InterContainerMessage{}
+ if err := proto.Unmarshal(b, m); err != nil {
return nil, err
}
- headerIntf, err := m.TryGetFieldByName("header")
- if err != nil {
- return nil, err
- }
-
- header := headerIntf.(*dynamic.Message)
-
- idIntf, err := header.TryGetFieldByName("id")
- if err != nil {
- return nil, err
- }
- id := idIntf.(string)
-
- typeIntf, err := header.TryGetFieldByName("type")
- if err != nil {
- return nil, err
- }
- msgType := typeIntf.(int32)
-
- fromTopicIntf, err := header.TryGetFieldByName("from_topic")
- if err != nil {
- return nil, err
- }
- fromTopic := fromTopicIntf.(string)
-
- toTopicIntf, err := header.TryGetFieldByName("to_topic")
- if err != nil {
- return nil, err
- }
- toTopic := toTopicIntf.(string)
-
- keyTopicIntf, err := header.TryGetFieldByName("key_topic")
- if err != nil {
- return nil, err
- }
- keyTopic := keyTopicIntf.(string)
-
- timestampIntf, err := header.TryGetFieldByName("timestamp")
- if err != nil {
- return nil, err
- }
- timestamp, err := DecodeTimestamp(timestampIntf)
+ header := m.Header
+ id := header.Id
+ msgType := header.Type
+ fromTopic := header.FromTopic
+ toTopic := header.ToTopic
+ keyTopic := header.KeyTopic
+ timestamp, err := DecodeTimestamp(header.Timestamp)
if err != nil {
return nil, err
}
@@ -222,65 +231,47 @@
var iaMessageTypeStr string
var toDeviceId string
var proxyDeviceId string
- body, bodyKind, err := DeserializeAny(icFile, m, "body")
+
+ bodyKind, err := ptypes.AnyMessageName(m.Body)
if err != nil {
return nil, err
}
+
switch bodyKind {
case "voltha.InterContainerRequestBody":
- argListIntf, err := body.TryGetFieldByName("args")
+ icRequest := &inter_container.InterContainerRequestBody{}
+ err := ptypes.UnmarshalAny(m.Body, icRequest)
if err != nil {
return nil, err
}
- argList := argListIntf.([]interface{})
- for _, argIntf := range argList {
- arg := argIntf.(*dynamic.Message)
- keyIntf, err := arg.TryGetFieldByName("key")
- if err != nil {
- return nil, err
- }
- key := keyIntf.(string)
+
+ argList := icRequest.Args
+ for _, arg := range argList {
+ key := arg.Key
if key == "msg" {
- argBody, argBodyKind, err := DeserializeAny(icFile, arg, "value")
+ argBodyKind, err := ptypes.AnyMessageName(m.Body)
if err != nil {
return nil, err
}
switch argBodyKind {
case "voltha.InterAdapterMessage":
- iaHeaderIntf, err := argBody.TryGetFieldByName("header")
+ iaMsg := &inter_container.InterAdapterMessage{}
+ err := ptypes.UnmarshalAny(arg.Value, iaMsg)
if err != nil {
return nil, err
}
- iaHeader := iaHeaderIntf.(*dynamic.Message)
- iaMessageTypeIntf, err := iaHeader.TryGetFieldByName("type")
- if err != nil {
- return nil, err
- }
- iaMessageType := iaMessageTypeIntf.(int32)
- iaMessageTypeStr, err = model.GetEnumString(iaHeader, "type", iaMessageType)
- if err != nil {
- return nil, err
- }
+ iaHeader := iaMsg.Header
+ iaMessageType := iaHeader.Type
+ iaMessageTypeStr = inter_container.InterAdapterMessageType_Types_name[int32(iaMessageType)]
- toDeviceIdIntf, err := iaHeader.TryGetFieldByName("to_device_id")
- if err != nil {
- return nil, err
- }
- toDeviceId = toDeviceIdIntf.(string)
-
- proxyDeviceIdIntf, err := iaHeader.TryGetFieldByName("proxy_device_id")
- if err != nil {
- return nil, err
- }
- proxyDeviceId = proxyDeviceIdIntf.(string)
+ toDeviceId = iaHeader.ToDeviceId
+ proxyDeviceId = iaHeader.ProxyDeviceId
}
}
}
}
- messageHeaderType, err := model.GetEnumString(header, "type", msgType)
- if err != nil {
- return nil, err
- }
+
+ messageHeaderType := inter_container.MessageType_name[int32(msgType)]
icHeader := MessageHeader{Id: id,
Type: messageHeaderType,
@@ -298,17 +289,24 @@
// Print the full message, either in JSON or in GRPCURL-human-readable format,
// depending on which grpcurl formatter is passed in.
-func PrintInterContainerMessage(f grpcurl.Formatter, md *desc.MessageDescriptor, b []byte) error {
- m := dynamic.NewMessage(md)
- err := m.Unmarshal(b)
- if err != nil {
+func PrintInterContainerMessage(outputAs string, b []byte) error {
+ ms := &inter_container.InterContainerMessage{}
+ if err := proto.Unmarshal(b, ms); err != nil {
return err
}
- s, err := f(m)
- if err != nil {
- return err
+
+ if outputAs == "json" {
+ marshaler := jsonpb.Marshaler{EmitDefaults: true, AnyResolver: &VolthaAnyResolver{}}
+ asJson, err := marshaler.MarshalToString(ms)
+ if err != nil {
+ return fmt.Errorf("Failed to marshal the json: %s", err)
+ }
+ fmt.Println(asJson)
+ } else {
+ // print in golang native format
+ fmt.Printf("%v\n", ms)
}
- fmt.Println(s)
+
return nil
}
@@ -333,23 +331,6 @@
return nil
}
-// Get the FileDescriptor that has the InterContainer protos
-func GetInterContainerDescriptorFile() (*desc.FileDescriptor, error) {
- descriptor, err := GetDescriptorSource()
- if err != nil {
- return nil, err
- }
-
- // get the symbol for voltha.InterContainerMessage
- iaSymbol, err := descriptor.FindSymbol("voltha.InterContainerMessage")
- if err != nil {
- return nil, err
- }
-
- icFile := iaSymbol.GetFile()
- return icFile, nil
-}
-
// Start output, print any column headers or other start characters
func (options *MessageListenOpts) StartOutput(outputFormat string) error {
if options.OutputAs == "json" {
@@ -378,13 +359,6 @@
return errors.New("Kafka address is not specified")
}
- icFile, err := GetInterContainerDescriptorFile()
- if err != nil {
- return err
- }
-
- icMessage := icFile.FindMessage("voltha.InterContainerMessage")
-
config := sarama.NewConfig()
config.ClientID = "go-kafka-consumer"
config.Consumer.Return.Errors = true
@@ -418,22 +392,6 @@
// Count how many messages were printed
count := 0
- var grpcurlFormatter grpcurl.Formatter
- // need a descriptor source, any method will do
- descriptor, _, err := GetMethod("device-list")
- if err != nil {
- return err
- }
-
- jsonFormatter := grpcurl.NewJSONFormatter(false, grpcurl.AnyResolverFromDescriptorSource(descriptor))
- if options.ShowBody {
- if options.OutputAs == "json" {
- grpcurlFormatter = jsonFormatter
- } else {
- grpcurlFormatter = grpcurl.NewTextFormatter(false)
- }
- }
-
var headerFilter *filter.Filter
if options.Filter != "" {
headerFilterVal, err := filter.Parse(options.Filter)
@@ -472,7 +430,7 @@
select {
case msg := <-consumer:
consumeCount++
- hdr, err := DecodeInterContainerHeader(icFile, icMessage, msg.Value, msg.Timestamp, jsonFormatter)
+ hdr, err := DecodeInterContainerHeader(msg.Value, msg.Timestamp)
if err != nil {
log.Printf("Error decoding header %v\n", err)
continue
@@ -491,7 +449,7 @@
// Print it
if options.ShowBody {
- if err := PrintInterContainerMessage(grpcurlFormatter, icMessage, msg.Value); err != nil {
+ if err := PrintInterContainerMessage(options.OutputAs, msg.Value); err != nil {
log.Printf("%v\n", err)
}
} else {