gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/internal/pkg/commands/adapter.go b/internal/pkg/commands/adapter.go
index 73b91e1..b299a8e 100644
--- a/internal/pkg/commands/adapter.go
+++ b/internal/pkg/commands/adapter.go
@@ -21,11 +21,12 @@
"github.com/golang/protobuf/ptypes/empty"
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltctl/pkg/model"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
const (
- DEFAULT_OUTPUT_FORMAT = "table{{ .Id }}\t{{ .Vendor }}\t{{ .Type }}\t{{ .Endpoint }}\t{{ .Version }}\t{{ .CurrentReplica }}\t{{ .TotalReplicas }}\t{{ since .LastCommunication}}"
+ DEFAULT_OUTPUT_FORMAT = "table{{ .Id }}\t{{ .Vendor }}\t{{ .Type }}\t{{ .Endpoint }}\t{{ .Version }}\t{{ .CurrentReplica }}\t{{ .TotalReplicas }}\t{{ gosince .LastCommunication}}"
)
type AdapterList struct {
@@ -60,6 +61,10 @@
if err != nil {
return err
}
+ data := make([]model.AdapterInstance, len(adapters.Items))
+ for i, item := range adapters.Items {
+ data[i].PopulateFrom(item)
+ }
outputFormat := CharReplacer.Replace(options.Format)
if outputFormat == "" {
@@ -82,7 +87,7 @@
OrderBy: orderBy,
OutputAs: toOutputType(options.OutputAs),
NameLimit: options.NameLimit,
- Data: adapters.Items,
+ Data: data,
}
GenerateOutput(&result)
diff --git a/internal/pkg/commands/devicegroups.go b/internal/pkg/commands/devicegroups.go
index f42547b..6ce4ebb 100644
--- a/internal/pkg/commands/devicegroups.go
+++ b/internal/pkg/commands/devicegroups.go
@@ -21,7 +21,7 @@
"github.com/golang/protobuf/ptypes/empty"
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
const (
diff --git a/internal/pkg/commands/devices.go b/internal/pkg/commands/devices.go
index 0bcda77..cce5f05 100644
--- a/internal/pkg/commands/devices.go
+++ b/internal/pkg/commands/devices.go
@@ -26,9 +26,9 @@
"github.com/golang/protobuf/ptypes/empty"
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/common"
- "github.com/opencord/voltha-protos/v4/go/extension"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/extension"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
const (
diff --git a/internal/pkg/commands/events.go b/internal/pkg/commands/events.go
index 06f6aee..76d4dbf 100644
--- a/internal/pkg/commands/events.go
+++ b/internal/pkg/commands/events.go
@@ -28,12 +28,11 @@
"github.com/Shopify/sarama"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/pkg/filter"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
const (
@@ -124,8 +123,7 @@
ts, okay := tsIntf.(*timestamp.Timestamp)
if okay {
// Voltha-Protos 3.2.3 and above
- result, err := ptypes.Timestamp(ts)
- return result, err
+ return ts.AsTime(), nil
}
tsFloat, okay := tsIntf.(float32)
if okay {
@@ -231,8 +229,9 @@
}
if outputAs == "json" {
- marshaler := jsonpb.Marshaler{EmitDefaults: true, AnyResolver: &VolthaAnyResolver{}}
+ marshaler := jsonpb.Marshaler{EmitDefaults: true}
asJson, err := marshaler.MarshalToString(ms)
+
if err != nil {
return fmt.Errorf("Failed to marshal the json: %s", err)
}
diff --git a/internal/pkg/commands/flows.go b/internal/pkg/commands/flows.go
index 4127545..378d01d 100644
--- a/internal/pkg/commands/flows.go
+++ b/internal/pkg/commands/flows.go
@@ -20,8 +20,8 @@
"fmt"
"github.com/opencord/voltctl/pkg/format"
"github.com/opencord/voltctl/pkg/model"
- "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"sort"
"strings"
)
diff --git a/internal/pkg/commands/groups.go b/internal/pkg/commands/groups.go
index 5e71e8f..e1f6bc0 100644
--- a/internal/pkg/commands/groups.go
+++ b/internal/pkg/commands/groups.go
@@ -21,8 +21,8 @@
"fmt"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
const (
diff --git a/internal/pkg/commands/log.go b/internal/pkg/commands/log.go
index ecd3e5f..fd05e70 100644
--- a/internal/pkg/commands/log.go
+++ b/internal/pkg/commands/log.go
@@ -25,9 +25,9 @@
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/pkg/format"
"github.com/opencord/voltctl/pkg/model"
- "github.com/opencord/voltha-lib-go/v6/pkg/config"
- "github.com/opencord/voltha-lib-go/v6/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/config"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
diff --git a/internal/pkg/commands/logicaldevices.go b/internal/pkg/commands/logicaldevices.go
index 2f2a1bc..69ead3b2 100644
--- a/internal/pkg/commands/logicaldevices.go
+++ b/internal/pkg/commands/logicaldevices.go
@@ -21,8 +21,8 @@
"github.com/golang/protobuf/ptypes/empty"
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"strings"
)
diff --git a/internal/pkg/commands/message.go b/internal/pkg/commands/message.go
deleted file mode 100644
index 338affd..0000000
--- a/internal/pkg/commands/message.go
+++ /dev/null
@@ -1,566 +0,0 @@
-/*
- * Copyright 2019-present Ciena Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package commands
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "github.com/Shopify/sarama"
- "github.com/golang/protobuf/jsonpb"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
- flags "github.com/jessevdk/go-flags"
- "github.com/opencord/voltctl/pkg/filter"
- "github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/inter_container"
- "log"
- "os"
- "os/signal"
- "strings"
- "time"
-)
-
-/*
- * The "message listen" command supports two types of output:
- * 1) A summary output where a row is displayed for each message received. For the summary
- * format, DEFAULT_MESSAGE_FORMAT contains the default list of columns that will be
- * display and can be overridden at runtime.
- * 2) A body output where the full grpcurl or json body is output for each message received.
- *
- * These two modes are switched by using the "-b" / "--body" flag.
- *
- * The summary mode has the potential to aggregate data together from multiple parts of the
- * message. For example, it currently aggregates the InterAdapterHeader contents together with
- * the InterContainerHeader contents.
- *
- * Similar to "event listen", the "message listen" command operates in a streaming mode, rather
- * than collecting a list of results and then emitting them at program exit. This is done to
- * facilitate options such as "-F" / "--follow" where the program is intended to operate
- * continuously. This means that automatically calculating column widths is not practical, and
- * a set of Fixed widths (MessageHeaderDefaultWidths) are predefined.
- *
- * As there are multiple kafka topics that can be listened to, specifying a topic is a
- * mandatory positional argument for the `message listen` command. Common topics include:
- * * openolt
- * * brcm_openonu_adapter
- * * rwcore
- * * core-pair-1
- */
-
-const (
- DEFAULT_MESSAGE_FORMAT = "table{{.Id}}\t{{.Type}}\t{{.FromTopic}}\t{{.ToTopic}}\t{{.KeyTopic}}\t{{.InterAdapterType}}"
-)
-
-type MessageListenOpts struct {
- Format string `long:"format" value-name:"FORMAT" default:"" description:"Format to use to output structured data"`
- // nolint: staticcheck
- OutputAs string `short:"o" long:"outputas" default:"table" choice:"table" choice:"json" choice:"yaml" description:"Type of output to generate"`
- Filter string `short:"f" long:"filter" default:"" value-name:"FILTER" description:"Only display results that match filter"`
- Follow bool `short:"F" long:"follow" description:"Continue to consume until CTRL-C is pressed"`
- ShowBody bool `short:"b" long:"show-body" description:"Show body of messages rather than only a header summary"`
- Count int `short:"c" long:"count" default:"-1" value-name:"LIMIT" description:"Limit the count of messages that will be printed"`
- Now bool `short:"n" long:"now" description:"Stop printing messages when current time is reached"`
- Timeout int `short:"t" long:"idle" default:"900" value-name:"SECONDS" description:"Timeout if no message received within specified seconds"`
- Since string `short:"s" long:"since" default:"" value-name:"TIMESTAMP" description:"Do not show entries before timestamp"`
-
- Args struct {
- Topic string
- } `positional-args:"yes" required:"yes"`
-}
-
-type MessageOpts struct {
- MessageListen MessageListenOpts `command:"listen"`
-}
-
-var interAdapterOpts = MessageOpts{}
-
-/* MessageHeader is a set of fields extracted
- * from voltha.MessageHeader as well as useful other
- * places such as InterAdapterHeader. These are fields that
- * will be summarized in list mode and/or can be filtered
- * on.
- */
-type MessageHeader struct {
- Id string `json:"id"`
- Type string `json:"type"`
- FromTopic string `json:"from_topic"`
- ToTopic string `json:"to_topic"`
- KeyTopic string `json:"key_topic"`
- Timestamp time.Time `json:"timestamp"`
- InterAdapterType string `json:"inter_adapter_type"` // interadapter header
- ToDeviceId string `json:"to_device_id"` // interadapter header
- ProxyDeviceId string `json:"proxy_device_id"` //interadapter header
-}
-
-/* Fixed widths because we output in a continuous streaming
- * mode rather than a table-based dump at the end.
- */
-type MessageHeaderWidths struct {
- Id int
- Type int
- FromTopic int
- ToTopic int
- KeyTopic int
- InterAdapterType int
- ToDeviceId int
- ProxyDeviceId int
- Timestamp int
-}
-
-var DefaultMessageWidths MessageHeaderWidths = MessageHeaderWidths{
- Id: 32,
- Type: 10,
- FromTopic: 16,
- ToTopic: 16,
- KeyTopic: 10,
- Timestamp: 10,
- InterAdapterType: 14,
- ToDeviceId: 10,
- ProxyDeviceId: 10,
-}
-
-// jsonpb requires a resolver to resolve Any.Any into proto.Message.
-type VolthaAnyResolver struct{}
-
-func (r *VolthaAnyResolver) Resolve(typeURL string) (proto.Message, error) {
- // TODO: We should be able to get this automatically via reflection using
- // the following commented-out code, but it requires upgrading voltctl to
- // use newer versions of protobuf libraries.
-
- /*
- msgType, err := protoregistry.GlobalTypes.FindMessageByURL(typeURL)
- if err != nil {
- return err
- }
- return msgType.New(), nil*/
-
- // The intercontianer message bus is where we need to map from Any.Any
- // to the appropriate protos when generating json output.
-
- typeURL = strings.TrimPrefix(typeURL, "type.googleapis.com/")
-
- switch typeURL {
- case "voltha.StrType":
- return &inter_container.StrType{}, nil
- case "voltha.IntType":
- return &inter_container.IntType{}, nil
- case "voltha.BoolType":
- return &inter_container.BoolType{}, nil
- case "voltha.Packet":
- return &inter_container.Packet{}, nil
- case "voltha.ErrorCode":
- return &inter_container.ErrorCode{}, nil
- case "voltha.Error":
- return &inter_container.Error{}, nil
- case "voltha.Header":
- return &inter_container.Header{}, nil
- case "voltha.Argument":
- return &inter_container.Argument{}, nil
- case "voltha.InterContainerMessage":
- return &inter_container.InterContainerMessage{}, nil
- case "voltha.InterContainerRequestBody":
- return &inter_container.InterContainerRequestBody{}, nil
- case "voltha.InterContainerResponseBody":
- return &inter_container.InterContainerResponseBody{}, nil
- case "voltha.SwitchCapability":
- return &inter_container.SwitchCapability{}, nil
- case "voltha.DeviceDiscovered":
- return &inter_container.DeviceDiscovered{}, nil
- case "voltha.InterAdapterMessageType":
- return &inter_container.InterAdapterMessageType{}, nil
- case "voltha.InterAdapterOmciMessage":
- return &inter_container.InterAdapterOmciMessage{}, nil
- case "voltha.InterAdapterTechProfileDownloadMessage":
- return &inter_container.InterAdapterTechProfileDownloadMessage{}, nil
- case "voltha.InterAdapterDeleteGemPortMessage":
- return &inter_container.InterAdapterDeleteGemPortMessage{}, nil
- case "voltha.InterAdapterDeleteTcontMessage":
- return &inter_container.InterAdapterDeleteTcontMessage{}, nil
- case "voltha.InterAdapterResponseBody":
- return &inter_container.InterAdapterResponseBody{}, nil
- case "voltha.InterAdapterMessage":
- return &inter_container.InterAdapterMessage{}, nil
- }
-
- return nil, fmt.Errorf("Unknown any type: %s", typeURL)
-}
-
-func RegisterMessageCommands(parent *flags.Parser) {
- if _, err := parent.AddCommand("message", "message commands", "Commands for observing messages between components", &interAdapterOpts); err != nil {
- Error.Fatalf("Unable to register message commands with voltctl command parser: %s", err.Error())
- }
-}
-
-// Extract the header, as well as a few other items that might be of interest
-func DecodeInterContainerHeader(b []byte, ts time.Time) (*MessageHeader, error) {
- m := &inter_container.InterContainerMessage{}
- if err := proto.Unmarshal(b, m); err != nil {
- return nil, err
- }
-
- header := m.Header
- id := header.Id
- msgType := header.Type
- fromTopic := header.FromTopic
- toTopic := header.ToTopic
- keyTopic := header.KeyTopic
- timestamp, err := DecodeTimestamp(header.Timestamp)
- if err != nil {
- return nil, err
- }
-
- // Pull some additional data out of the InterAdapterHeader, if it
- // is embedded inside the InterContainerMessage
-
- var iaMessageTypeStr string
- var toDeviceId string
- var proxyDeviceId string
-
- bodyKind, err := ptypes.AnyMessageName(m.Body)
- if err != nil {
- return nil, err
- }
-
- switch bodyKind {
- case "voltha.InterContainerRequestBody":
- icRequest := &inter_container.InterContainerRequestBody{}
- err := ptypes.UnmarshalAny(m.Body, icRequest)
- if err != nil {
- return nil, err
- }
-
- argList := icRequest.Args
- for _, arg := range argList {
- key := arg.Key
- if key == "msg" {
- argBodyKind, err := ptypes.AnyMessageName(m.Body)
- if err != nil {
- return nil, err
- }
- switch argBodyKind {
- case "voltha.InterAdapterMessage":
- iaMsg := &inter_container.InterAdapterMessage{}
- err := ptypes.UnmarshalAny(arg.Value, iaMsg)
- if err != nil {
- return nil, err
- }
- iaHeader := iaMsg.Header
- iaMessageType := iaHeader.Type
- iaMessageTypeStr = inter_container.InterAdapterMessageType_Types_name[int32(iaMessageType)]
-
- toDeviceId = iaHeader.ToDeviceId
- proxyDeviceId = iaHeader.ProxyDeviceId
- }
- }
- }
- }
-
- messageHeaderType := inter_container.MessageType_name[int32(msgType)]
-
- icHeader := MessageHeader{Id: id,
- Type: messageHeaderType,
- FromTopic: fromTopic,
- ToTopic: toTopic,
- KeyTopic: keyTopic,
- Timestamp: timestamp,
- InterAdapterType: iaMessageTypeStr,
- ProxyDeviceId: proxyDeviceId,
- ToDeviceId: toDeviceId,
- }
-
- return &icHeader, nil
-}
-
-// Print the full message, either in JSON or in GRPCURL-human-readable format,
-// depending on which grpcurl formatter is passed in.
-func PrintInterContainerMessage(outputAs string, b []byte) error {
- ms := &inter_container.InterContainerMessage{}
- if err := proto.Unmarshal(b, ms); err != nil {
- return err
- }
-
- if outputAs == "json" {
- marshaler := jsonpb.Marshaler{EmitDefaults: true, AnyResolver: &VolthaAnyResolver{}}
- asJson, err := marshaler.MarshalToString(ms)
- if err != nil {
- return fmt.Errorf("Failed to marshal the json: %s", err)
- }
- fmt.Println(asJson)
- } else {
- // print in golang native format
- fmt.Printf("%v\n", ms)
- }
-
- return nil
-}
-
-// Print just the enriched InterContainerHeader. This is either in JSON format, or in
-// table format.
-func PrintInterContainerHeader(outputAs string, outputFormat string, hdr *MessageHeader) error {
- if outputAs == "json" {
- asJson, err := json.Marshal(hdr)
- if err != nil {
- return fmt.Errorf("Error marshalling JSON: %v", err)
- } else {
- fmt.Printf("%s\n", asJson)
- }
- } else {
- f := format.Format(outputFormat)
- output, err := f.ExecuteFixedWidth(DefaultMessageWidths, false, *hdr)
- if err != nil {
- return err
- }
- fmt.Printf("%s\n", output)
- }
- return nil
-}
-
-// Start output, print any column headers or other start characters
-func (options *MessageListenOpts) StartOutput(outputFormat string) error {
- if options.OutputAs == "json" {
- fmt.Println("[")
- } else if (options.OutputAs == "table") && !options.ShowBody {
- f := format.Format(outputFormat)
- output, err := f.ExecuteFixedWidth(DefaultMessageWidths, true, nil)
- if err != nil {
- return err
- }
- fmt.Println(output)
- }
- return nil
-}
-
-// Finish output, print any column footers or other end characters
-func (options *MessageListenOpts) FinishOutput() {
- if options.OutputAs == "json" {
- fmt.Println("]")
- }
-}
-
-func (options *MessageListenOpts) Execute(args []string) error {
- ProcessGlobalOptions()
- if GlobalConfig.Current().Kafka == "" {
- return errors.New("Kafka address is not specified")
- }
-
- config := sarama.NewConfig()
- config.ClientID = "go-kafka-consumer"
- config.Consumer.Return.Errors = true
- config.Version = sarama.V1_0_0_0
- brokers := []string{GlobalConfig.Current().Kafka}
-
- client, err := sarama.NewClient(brokers, config)
- if err != nil {
- return err
- }
-
- defer func() {
- if err := client.Close(); err != nil {
- panic(err)
- }
- }()
-
- consumer, consumerErrors, highwaterMarks, err := startInterContainerConsumer([]string{options.Args.Topic}, client)
- if err != nil {
- return err
- }
-
- highwater := highwaterMarks[options.Args.Topic]
-
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
-
- // Count how many message processed
- consumeCount := 0
-
- // Count how many messages were printed
- count := 0
-
- var headerFilter *filter.Filter
- if options.Filter != "" {
- headerFilterVal, err := filter.Parse(options.Filter)
- if err != nil {
- return fmt.Errorf("Failed to parse filter: %v", err)
- }
- headerFilter = &headerFilterVal
- }
-
- outputFormat := CharReplacer.Replace(options.Format)
- if outputFormat == "" {
- outputFormat = GetCommandOptionWithDefault("intercontainer-listen", "format", DEFAULT_MESSAGE_FORMAT)
- }
-
- err = options.StartOutput(outputFormat)
- if err != nil {
- return err
- }
-
- var since *time.Time
- if options.Since != "" {
- since, err = ParseSince(options.Since)
- if err != nil {
- return err
- }
- }
-
- // Get signnal for finish
- doneCh := make(chan struct{})
- go func() {
- tStart := time.Now()
- Loop:
- for {
- // Initialize the idle timeout timer
- timeoutTimer := time.NewTimer(time.Duration(options.Timeout) * time.Second)
- select {
- case msg := <-consumer:
- consumeCount++
- hdr, err := DecodeInterContainerHeader(msg.Value, msg.Timestamp)
- if err != nil {
- log.Printf("Error decoding header %v\n", err)
- continue
- }
-
- match := false
- if headerFilter != nil {
- var err error
- if match, err = headerFilter.Evaluate(*hdr); err != nil {
- log.Printf("%v\n", err)
- }
- } else {
- match = true
- }
-
- if !match {
- // skip printing message
- } else if since != nil && hdr.Timestamp.Before(*since) {
- // it's too old
- } else {
- // comma separated between this message and predecessor
- if count > 0 {
- if options.OutputAs == "json" {
- fmt.Println(",")
- }
- }
-
- // Print it
- if options.ShowBody {
- if err := PrintInterContainerMessage(options.OutputAs, msg.Value); err != nil {
- log.Printf("%v\n", err)
- }
- } else {
- if err := PrintInterContainerHeader(options.OutputAs, outputFormat, hdr); err != nil {
- log.Printf("%v\n", err)
- }
- }
-
- // Check to see if we've hit the "count" threshold the user specified
- count++
- if (options.Count > 0) && (count >= options.Count) {
- log.Println("Count reached")
- doneCh <- struct{}{}
- break Loop
- }
-
- // Check to see if we've hit the "now" threshold the user specified
- if (options.Now) && (!hdr.Timestamp.Before(tStart)) {
- log.Println("Now timestamp reached")
- doneCh <- struct{}{}
- break Loop
- }
- }
-
- // If we're not in follow mode, see if we hit the highwater mark
- if !options.Follow && !options.Now && (msg.Offset >= highwater) {
- log.Println("High water reached")
- doneCh <- struct{}{}
- break Loop
- }
-
- // Reset the timeout timer
- if !timeoutTimer.Stop() {
- <-timeoutTimer.C
- }
- case consumerError := <-consumerErrors:
- log.Printf("Received consumerError topic=%v, partition=%v, err=%v\n", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err)
- doneCh <- struct{}{}
- case <-signals:
- doneCh <- struct{}{}
- case <-timeoutTimer.C:
- log.Printf("Idle timeout\n")
- doneCh <- struct{}{}
- }
- }
- }()
-
- <-doneCh
-
- options.FinishOutput()
-
- log.Printf("Consumed %d messages. Printed %d messages", consumeCount, count)
-
- return nil
-}
-
-// Consume message from Sarama and send them out on a channel.
-// Supports multiple topics.
-// Taken from Sarama example consumer.
-func startInterContainerConsumer(topics []string, client sarama.Client) (chan *sarama.ConsumerMessage, chan *sarama.ConsumerError, map[string]int64, error) {
- master, err := sarama.NewConsumerFromClient(client)
- if err != nil {
- return nil, nil, nil, err
- }
-
- consumers := make(chan *sarama.ConsumerMessage)
- errors := make(chan *sarama.ConsumerError)
- highwater := make(map[string]int64)
- for _, topic := range topics {
- if strings.Contains(topic, "__consumer_offsets") {
- continue
- }
- partitions, _ := master.Partitions(topic)
-
- // TODO: Add support for multiple partitions
- if len(partitions) > 1 {
- log.Printf("WARNING: %d partitions on topic %s but we only listen to the first one\n", len(partitions), topic)
- }
-
- hw, err := client.GetOffset("openolt", partitions[0], sarama.OffsetNewest)
- if err != nil {
- return nil, nil, nil, fmt.Errorf("Error in consume() getting highwater: Topic %v Partitions: %v", topic, partitions)
- }
- highwater[topic] = hw
-
- consumer, err := master.ConsumePartition(topic, partitions[0], sarama.OffsetOldest)
- if nil != err {
- return nil, nil, nil, fmt.Errorf("Error in consume(): Topic %v Partitions: %v", topic, partitions)
- }
- log.Println(" Start consuming topic ", topic)
- go func(topic string, consumer sarama.PartitionConsumer) {
- for {
- select {
- case consumerError := <-consumer.Errors():
- errors <- consumerError
-
- case msg := <-consumer.Messages():
- consumers <- msg
- }
- }
- }(topic, consumer)
- }
-
- return consumers, errors, highwater, nil
-}
diff --git a/internal/pkg/commands/stats.go b/internal/pkg/commands/stats.go
index 8b84a0a..a91f9d0 100644
--- a/internal/pkg/commands/stats.go
+++ b/internal/pkg/commands/stats.go
@@ -18,7 +18,7 @@
import (
"fmt"
"github.com/opencord/voltctl/pkg/model"
- "github.com/opencord/voltha-protos/v4/go/extension"
+ "github.com/opencord/voltha-protos/v5/go/extension"
"strings"
)
diff --git a/internal/pkg/commands/version.go b/internal/pkg/commands/version.go
index 4e3946f..1efac08 100644
--- a/internal/pkg/commands/version.go
+++ b/internal/pkg/commands/version.go
@@ -24,7 +24,7 @@
flags "github.com/jessevdk/go-flags"
"github.com/opencord/voltctl/internal/pkg/cli/version"
"github.com/opencord/voltctl/pkg/format"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
type VersionDetails struct {