blob: 0fce0f6dd19f7f2cc66f3d0f32db4e6f83856bf2 [file] [log] [blame]
Scott Baker6cf525a2019-05-09 12:25:08 -07001/*
2 * Portions copyright 2019-present Open Networking Foundation
3 * Original copyright 2019-present Ciena Corporation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package commands
18
19import (
20 "context"
21 "crypto/sha1"
22 "github.com/fullstorydev/grpcurl"
23 "github.com/golang/protobuf/proto"
24 "github.com/jhump/protoreflect/dynamic"
25 "google.golang.org/grpc"
26 "hash"
27 "io"
28 "os"
29)
30
31/* Handlers for streaming upload and download */
32
33type DownloadHandler struct {
34 RpcEventHandler
35 f *os.File
36 chunks int
37 bytes int
38 status string
39 hash hash.Hash
40}
41
42type UploadHandler struct {
43 RpcEventHandler
44 chunksize int
45 f *os.File
46 uri string
47}
48
49func (h *DownloadHandler) OnReceiveResponse(m proto.Message) {
50 d, err := dynamic.AsDynamicMessage(m)
51 if err != nil {
52 h.status = "ERROR"
53 // TODO(smbaker): How to raise an exception?
54 return
55 }
56 chunk := d.GetFieldByName("chunk").(string)
57 io.WriteString(h.hash, chunk)
58 h.f.Write([]byte(chunk))
59 h.chunks += 1
60 h.bytes += len(chunk)
61}
62
63func (h *UploadHandler) GetParams(msg proto.Message) error {
64 dmsg, err := dynamic.AsDynamicMessage(msg)
65 if err != nil {
66 return err
67 }
68
69 //fmt.Printf("streamer, MessageName: %s\n", dmsg.XXX_MessageName())
70
71 block := make([]byte, h.chunksize)
72 bytes_read, err := h.f.Read(block)
73
74 if err == io.EOF {
75 h.f.Close()
76 //fmt.Print("EOF\n")
77 return err
78 }
79
80 if err != nil {
81 //fmt.Print("ERROR!\n")
82 return err
83 }
84
85 dmsg.TrySetFieldByName("uri", h.uri)
86 dmsg.TrySetFieldByName("chunk", string(block[:bytes_read]))
87
88 return nil
89}
90
91func UploadFile(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, local_name string, uri string, chunkSize int) (*dynamic.Message, error) {
92 ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
93 defer cancel()
94
95 headers := GenerateHeaders()
96
97 f, err := os.Open(local_name)
98 if err != nil {
99 return nil, err
100 }
101
102 h := &UploadHandler{uri: uri, f: f, chunksize: chunkSize}
103
104 err = grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.filetransfer/Upload", headers, h, h.GetParams)
105 if err != nil {
106 return nil, err
107 }
108 d, err := dynamic.AsDynamicMessage(h.Response)
109 if err != nil {
110 return nil, err
111 }
112
113 return d, err
114}
115
116func DownloadFile(conn *grpc.ClientConn, descriptor grpcurl.DescriptorSource, uri string, local_name string) (*DownloadHandler, error) {
117 ctx, cancel := context.WithTimeout(context.Background(), GlobalConfig.Grpc.Timeout)
118 defer cancel()
119
120 headers := GenerateHeaders()
121
122 f, err := os.Create(local_name)
123 if err != nil {
124 return nil, err
125 }
126
127 dm := make(map[string]interface{})
128 dm["uri"] = uri
129
130 h := &DownloadHandler{
131 RpcEventHandler: RpcEventHandler{
132 Fields: map[string]map[string]interface{}{"xos.FileRequest": dm},
133 },
134 f: f,
135 hash: sha1.New(),
136 status: "SUCCESS"}
137
138 err = grpcurl.InvokeRPC(ctx, descriptor, conn, "xos.filetransfer/Download", headers, h, h.GetParams)
139 if err != nil {
140 return nil, err
141 }
142
143 return h, err
144}