SEBA-580 cordctl CLI for XOS
Change-Id: I807821a12d3d0fe4aee89791e2fe46e973da8eda
diff --git a/commands/transfer.go b/commands/transfer.go
new file mode 100644
index 0000000..8a42af3
--- /dev/null
+++ b/commands/transfer.go
@@ -0,0 +1,271 @@
+/*
+ * Portions copyright 2019-present Open Networking Foundation
+ * Original copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package commands
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/fullstorydev/grpcurl"
+ "github.com/golang/protobuf/proto"
+ flags "github.com/jessevdk/go-flags"
+ "github.com/jhump/protoreflect/dynamic"
+ "github.com/opencord/cordctl/format"
+ "io"
+ "os"
+ "strings"
+)
+
+const (
+ DEFAULT_TRANSFER_FORMAT = "table{{ .Status }}\t{{ .Checksum }}\t{{ .Chunks }}\t{{ .Bytes }}"
+)
+
+type TransferOutput struct {
+ Status string `json:"status"`
+ Checksum string `json:"checksum"`
+ Chunks int `json:"chunks"`
+ Bytes int `json:"bytes"`
+}
+
+type TransferUpload struct {
+ OutputOptions
+ ChunkSize int `short:"h" long:"chunksize" default:"65536" description:"Host and port"`
+ Args struct {
+ LocalFileName string
+ URI string
+ } `positional-args:"yes" required:"yes"`
+}
+
+type TransferDownload struct {
+ OutputOptions
+ Args struct {
+ URI string
+ LocalFileName string
+ } `positional-args:"yes" required:"yes"`
+}
+
+type TransferOpts struct {
+ Upload TransferUpload `command:"upload"`
+ Download TransferDownload `command:"download"`
+}
+
+var transferOpts = TransferOpts{}
+
+func RegisterTransferCommands(parser *flags.Parser) {
+ parser.AddCommand("transfer", "file transfer commands", "Commands to transfer files to and from XOS", &transferOpts)
+}
+
+/* Handlers for streaming upload and download */
+
+type DownloadHandler struct {
+ RpcEventHandler
+ f *os.File
+ chunks int
+ bytes int
+ status string
+}
+
+type UploadHandler struct {
+ RpcEventHandler
+ chunksize int
+ f *os.File
+ uri string
+}
+
+func (h *DownloadHandler) OnReceiveResponse(m proto.Message) {
+ d, err := dynamic.AsDynamicMessage(m)
+ if err != nil {
+ h.status = "ERROR"
+ // TODO(smbaker): How to raise an exception?
+ return
+ }
+ chunk := d.GetFieldByName("chunk").(string)
+ h.f.Write([]byte(chunk))
+ h.chunks += 1
+ h.bytes += len(chunk)
+}
+
+func (h *UploadHandler) GetParams(msg proto.Message) error {
+ dmsg, err := dynamic.AsDynamicMessage(msg)
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("streamer, MessageName: %s\n", dmsg.XXX_MessageName())
+
+ block := make([]byte, h.chunksize)
+ bytes_read, err := h.f.Read(block)
+
+ if err == io.EOF {
+ h.f.Close()
+ fmt.Print("EOF\n")
+ return err
+ }
+
+ if err != nil {
+ fmt.Print("ERROR!\n")
+ return err
+ }
+
+ dmsg.TrySetFieldByName("uri", h.uri)
+ dmsg.TrySetFieldByName("chunk", string(block[:bytes_read]))
+
+ return nil
+}
+
+/* Command processors */
+
+func (options *TransferUpload) Execute(args []string) error {
+
+ conn, err := NewConnection()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+
+ local_name := options.Args.LocalFileName
+ uri := options.Args.URI
+
+ descriptor, method, err := GetReflectionMethod(conn, "xos.filetransfer/Upload")
+ if err != nil {
+ return err
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ f, err := os.Open(local_name)
+ if err != nil {
+ return err
+ }
+
+ h := &UploadHandler{uri: uri, f: f, chunksize: options.ChunkSize}
+
+ err = grpcurl.InvokeRPC(ctx, descriptor, conn, method, headers, h, h.GetParams)
+ if err != nil {
+ return err
+ }
+ d, err := dynamic.AsDynamicMessage(h.Response)
+ if err != nil {
+ return err
+ }
+
+ outputFormat := CharReplacer.Replace(options.Format)
+ if outputFormat == "" {
+ outputFormat = DEFAULT_TRANSFER_FORMAT
+ }
+ if options.Quiet {
+ outputFormat = "{{.Status}}"
+ }
+
+ data := make([]TransferOutput, 1)
+ data[0].Checksum = d.GetFieldByName("checksum").(string)
+ data[0].Chunks = int(d.GetFieldByName("chunks_received").(int32))
+ data[0].Bytes = int(d.GetFieldByName("bytes_received").(int32))
+ data[0].Status = GetEnumValue(d, "status")
+
+ result := CommandResult{
+ Format: format.Format(outputFormat),
+ OutputAs: toOutputType(options.OutputAs),
+ Data: data,
+ }
+
+ GenerateOutput(&result)
+
+ return nil
+}
+
+func IsFileUri(s string) bool {
+ return strings.HasPrefix(s, "file://")
+}
+
+func (options *TransferDownload) Execute(args []string) error {
+
+ conn, err := NewConnection()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+
+ local_name := options.Args.LocalFileName
+ uri := options.Args.URI
+
+ if IsFileUri(local_name) {
+ return errors.New("local_name argument should not be a uri")
+ }
+
+ if !IsFileUri(uri) {
+ return errors.New("uri argument should be a file:// uri")
+ }
+
+ descriptor, method, err := GetReflectionMethod(conn, "xos.filetransfer/Download")
+ if err != nil {
+ return err
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ headers := GenerateHeaders()
+
+ f, err := os.Create(local_name)
+ if err != nil {
+ return err
+ }
+
+ dm := make(map[string]interface{})
+ dm["uri"] = uri
+
+ h := &DownloadHandler{
+ RpcEventHandler: RpcEventHandler{
+ Fields: map[string]map[string]interface{}{"xos.FileRequest": dm},
+ },
+ f: f,
+ chunks: 0,
+ bytes: 0,
+ status: "SUCCESS"}
+
+ err = grpcurl.InvokeRPC(ctx, descriptor, conn, method, headers, h, h.GetParams)
+ if err != nil {
+ return err
+ }
+
+ outputFormat := CharReplacer.Replace(options.Format)
+ if outputFormat == "" {
+ outputFormat = DEFAULT_TRANSFER_FORMAT
+ }
+ if options.Quiet {
+ outputFormat = "{{.Status}}"
+ }
+
+ data := make([]TransferOutput, 1)
+ data[0].Chunks = h.chunks
+ data[0].Bytes = h.bytes
+ data[0].Status = h.status
+
+ result := CommandResult{
+ Format: format.Format(outputFormat),
+ OutputAs: toOutputType(options.OutputAs),
+ Data: data,
+ }
+
+ GenerateOutput(&result)
+
+ return nil
+}