blob: 8a42af3de4204756d81186c1c714bb8238bb8adf [file] [log] [blame]
Scott Baker2c0ebda2019-05-06 16:55:47 -07001/*
2 * Portions copyright 2019-present Open Networking Foundation
3 * Original copyright 2019-present Ciena Corporation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package commands
18
19import (
20 "context"
21 "errors"
22 "fmt"
23 "github.com/fullstorydev/grpcurl"
24 "github.com/golang/protobuf/proto"
25 flags "github.com/jessevdk/go-flags"
26 "github.com/jhump/protoreflect/dynamic"
27 "github.com/opencord/cordctl/format"
28 "io"
29 "os"
30 "strings"
31)
32
33const (
34 DEFAULT_TRANSFER_FORMAT = "table{{ .Status }}\t{{ .Checksum }}\t{{ .Chunks }}\t{{ .Bytes }}"
35)
36
37type TransferOutput struct {
38 Status string `json:"status"`
39 Checksum string `json:"checksum"`
40 Chunks int `json:"chunks"`
41 Bytes int `json:"bytes"`
42}
43
44type TransferUpload struct {
45 OutputOptions
46 ChunkSize int `short:"h" long:"chunksize" default:"65536" description:"Host and port"`
47 Args struct {
48 LocalFileName string
49 URI string
50 } `positional-args:"yes" required:"yes"`
51}
52
53type TransferDownload struct {
54 OutputOptions
55 Args struct {
56 URI string
57 LocalFileName string
58 } `positional-args:"yes" required:"yes"`
59}
60
61type TransferOpts struct {
62 Upload TransferUpload `command:"upload"`
63 Download TransferDownload `command:"download"`
64}
65
66var transferOpts = TransferOpts{}
67
68func RegisterTransferCommands(parser *flags.Parser) {
69 parser.AddCommand("transfer", "file transfer commands", "Commands to transfer files to and from XOS", &transferOpts)
70}
71
72/* Handlers for streaming upload and download */
73
74type DownloadHandler struct {
75 RpcEventHandler
76 f *os.File
77 chunks int
78 bytes int
79 status string
80}
81
82type UploadHandler struct {
83 RpcEventHandler
84 chunksize int
85 f *os.File
86 uri string
87}
88
89func (h *DownloadHandler) OnReceiveResponse(m proto.Message) {
90 d, err := dynamic.AsDynamicMessage(m)
91 if err != nil {
92 h.status = "ERROR"
93 // TODO(smbaker): How to raise an exception?
94 return
95 }
96 chunk := d.GetFieldByName("chunk").(string)
97 h.f.Write([]byte(chunk))
98 h.chunks += 1
99 h.bytes += len(chunk)
100}
101
102func (h *UploadHandler) GetParams(msg proto.Message) error {
103 dmsg, err := dynamic.AsDynamicMessage(msg)
104 if err != nil {
105 return err
106 }
107
108 fmt.Printf("streamer, MessageName: %s\n", dmsg.XXX_MessageName())
109
110 block := make([]byte, h.chunksize)
111 bytes_read, err := h.f.Read(block)
112
113 if err == io.EOF {
114 h.f.Close()
115 fmt.Print("EOF\n")
116 return err
117 }
118
119 if err != nil {
120 fmt.Print("ERROR!\n")
121 return err
122 }
123
124 dmsg.TrySetFieldByName("uri", h.uri)
125 dmsg.TrySetFieldByName("chunk", string(block[:bytes_read]))
126
127 return nil
128}
129
130/* Command processors */
131
132func (options *TransferUpload) Execute(args []string) error {
133
134 conn, err := NewConnection()
135 if err != nil {
136 return err
137 }
138 defer conn.Close()
139
140 local_name := options.Args.LocalFileName
141 uri := options.Args.URI
142
143 descriptor, method, err := GetReflectionMethod(conn, "xos.filetransfer/Upload")
144 if err != nil {
145 return err
146 }
147
148 ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
149 defer cancel()
150
151 headers := GenerateHeaders()
152
153 f, err := os.Open(local_name)
154 if err != nil {
155 return err
156 }
157
158 h := &UploadHandler{uri: uri, f: f, chunksize: options.ChunkSize}
159
160 err = grpcurl.InvokeRPC(ctx, descriptor, conn, method, headers, h, h.GetParams)
161 if err != nil {
162 return err
163 }
164 d, err := dynamic.AsDynamicMessage(h.Response)
165 if err != nil {
166 return err
167 }
168
169 outputFormat := CharReplacer.Replace(options.Format)
170 if outputFormat == "" {
171 outputFormat = DEFAULT_TRANSFER_FORMAT
172 }
173 if options.Quiet {
174 outputFormat = "{{.Status}}"
175 }
176
177 data := make([]TransferOutput, 1)
178 data[0].Checksum = d.GetFieldByName("checksum").(string)
179 data[0].Chunks = int(d.GetFieldByName("chunks_received").(int32))
180 data[0].Bytes = int(d.GetFieldByName("bytes_received").(int32))
181 data[0].Status = GetEnumValue(d, "status")
182
183 result := CommandResult{
184 Format: format.Format(outputFormat),
185 OutputAs: toOutputType(options.OutputAs),
186 Data: data,
187 }
188
189 GenerateOutput(&result)
190
191 return nil
192}
193
194func IsFileUri(s string) bool {
195 return strings.HasPrefix(s, "file://")
196}
197
198func (options *TransferDownload) Execute(args []string) error {
199
200 conn, err := NewConnection()
201 if err != nil {
202 return err
203 }
204 defer conn.Close()
205
206 local_name := options.Args.LocalFileName
207 uri := options.Args.URI
208
209 if IsFileUri(local_name) {
210 return errors.New("local_name argument should not be a uri")
211 }
212
213 if !IsFileUri(uri) {
214 return errors.New("uri argument should be a file:// uri")
215 }
216
217 descriptor, method, err := GetReflectionMethod(conn, "xos.filetransfer/Download")
218 if err != nil {
219 return err
220 }
221
222 ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
223 defer cancel()
224
225 headers := GenerateHeaders()
226
227 f, err := os.Create(local_name)
228 if err != nil {
229 return err
230 }
231
232 dm := make(map[string]interface{})
233 dm["uri"] = uri
234
235 h := &DownloadHandler{
236 RpcEventHandler: RpcEventHandler{
237 Fields: map[string]map[string]interface{}{"xos.FileRequest": dm},
238 },
239 f: f,
240 chunks: 0,
241 bytes: 0,
242 status: "SUCCESS"}
243
244 err = grpcurl.InvokeRPC(ctx, descriptor, conn, method, headers, h, h.GetParams)
245 if err != nil {
246 return err
247 }
248
249 outputFormat := CharReplacer.Replace(options.Format)
250 if outputFormat == "" {
251 outputFormat = DEFAULT_TRANSFER_FORMAT
252 }
253 if options.Quiet {
254 outputFormat = "{{.Status}}"
255 }
256
257 data := make([]TransferOutput, 1)
258 data[0].Chunks = h.chunks
259 data[0].Bytes = h.bytes
260 data[0].Status = h.status
261
262 result := CommandResult{
263 Format: format.Format(outputFormat),
264 OutputAs: toOutputType(options.OutputAs),
265 Data: data,
266 }
267
268 GenerateOutput(&result)
269
270 return nil
271}