Matteo Scandolo | a428586 | 2020-12-01 18:10:10 -0800 | [diff] [blame] | 1 | /* |
| 2 | Copyright 2015 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | // Package framer implements simple frame decoding techniques for an io.ReadCloser |
| 18 | package framer |
| 19 | |
| 20 | import ( |
| 21 | "encoding/binary" |
| 22 | "encoding/json" |
| 23 | "io" |
| 24 | ) |
| 25 | |
| 26 | type lengthDelimitedFrameWriter struct { |
| 27 | w io.Writer |
| 28 | h [4]byte |
| 29 | } |
| 30 | |
| 31 | func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer { |
| 32 | return &lengthDelimitedFrameWriter{w: w} |
| 33 | } |
| 34 | |
| 35 | // Write writes a single frame to the nested writer, prepending it with the length in |
| 36 | // in bytes of data (as a 4 byte, bigendian uint32). |
| 37 | func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) { |
| 38 | binary.BigEndian.PutUint32(w.h[:], uint32(len(data))) |
| 39 | n, err := w.w.Write(w.h[:]) |
| 40 | if err != nil { |
| 41 | return 0, err |
| 42 | } |
| 43 | if n != len(w.h) { |
| 44 | return 0, io.ErrShortWrite |
| 45 | } |
| 46 | return w.w.Write(data) |
| 47 | } |
| 48 | |
| 49 | type lengthDelimitedFrameReader struct { |
| 50 | r io.ReadCloser |
| 51 | remaining int |
| 52 | } |
| 53 | |
| 54 | // NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed |
| 55 | // frames off of a stream. |
| 56 | // |
| 57 | // The protocol is: |
| 58 | // |
| 59 | // stream: message ... |
| 60 | // message: prefix body |
| 61 | // prefix: 4 byte uint32 in BigEndian order, denotes length of body |
| 62 | // body: bytes (0..prefix) |
| 63 | // |
| 64 | // If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead |
| 65 | // will be returned along with the number of bytes read. |
| 66 | func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser { |
| 67 | return &lengthDelimitedFrameReader{r: r} |
| 68 | } |
| 69 | |
| 70 | // Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer |
| 71 | // is returned and subsequent calls will attempt to read the last frame. A frame is complete when |
| 72 | // err is nil. |
| 73 | func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) { |
| 74 | if r.remaining <= 0 { |
| 75 | header := [4]byte{} |
| 76 | n, err := io.ReadAtLeast(r.r, header[:4], 4) |
| 77 | if err != nil { |
| 78 | return 0, err |
| 79 | } |
| 80 | if n != 4 { |
| 81 | return 0, io.ErrUnexpectedEOF |
| 82 | } |
| 83 | frameLength := int(binary.BigEndian.Uint32(header[:])) |
| 84 | r.remaining = frameLength |
| 85 | } |
| 86 | |
| 87 | expect := r.remaining |
| 88 | max := expect |
| 89 | if max > len(data) { |
| 90 | max = len(data) |
| 91 | } |
| 92 | n, err := io.ReadAtLeast(r.r, data[:max], int(max)) |
| 93 | r.remaining -= n |
| 94 | if err == io.ErrShortBuffer || r.remaining > 0 { |
| 95 | return n, io.ErrShortBuffer |
| 96 | } |
| 97 | if err != nil { |
| 98 | return n, err |
| 99 | } |
| 100 | if n != expect { |
| 101 | return n, io.ErrUnexpectedEOF |
| 102 | } |
| 103 | |
| 104 | return n, nil |
| 105 | } |
| 106 | |
| 107 | func (r *lengthDelimitedFrameReader) Close() error { |
| 108 | return r.r.Close() |
| 109 | } |
| 110 | |
| 111 | type jsonFrameReader struct { |
| 112 | r io.ReadCloser |
| 113 | decoder *json.Decoder |
| 114 | remaining []byte |
| 115 | } |
| 116 | |
| 117 | // NewJSONFramedReader returns an io.Reader that will decode individual JSON objects off |
| 118 | // of a wire. |
| 119 | // |
| 120 | // The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate |
| 121 | // the read. |
| 122 | func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser { |
| 123 | return &jsonFrameReader{ |
| 124 | r: r, |
| 125 | decoder: json.NewDecoder(r), |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | // ReadFrame decodes the next JSON object in the stream, or returns an error. The returned |
| 130 | // byte slice will be modified the next time ReadFrame is invoked and should not be altered. |
| 131 | func (r *jsonFrameReader) Read(data []byte) (int, error) { |
| 132 | // Return whatever remaining data exists from an in progress frame |
| 133 | if n := len(r.remaining); n > 0 { |
| 134 | if n <= len(data) { |
| 135 | data = append(data[0:0], r.remaining...) |
| 136 | r.remaining = nil |
| 137 | return n, nil |
| 138 | } |
| 139 | |
| 140 | n = len(data) |
| 141 | data = append(data[0:0], r.remaining[:n]...) |
| 142 | r.remaining = r.remaining[n:] |
| 143 | return n, io.ErrShortBuffer |
| 144 | } |
| 145 | |
| 146 | // RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see |
| 147 | // data written to data, or be larger than data and a different array. |
| 148 | n := len(data) |
| 149 | m := json.RawMessage(data[:0]) |
| 150 | if err := r.decoder.Decode(&m); err != nil { |
| 151 | return 0, err |
| 152 | } |
| 153 | |
| 154 | // If capacity of data is less than length of the message, decoder will allocate a new slice |
| 155 | // and set m to it, which means we need to copy the partial result back into data and preserve |
| 156 | // the remaining result for subsequent reads. |
| 157 | if len(m) > n { |
| 158 | data = append(data[0:0], m[:n]...) |
| 159 | r.remaining = m[n:] |
| 160 | return n, io.ErrShortBuffer |
| 161 | } |
| 162 | return len(m), nil |
| 163 | } |
| 164 | |
| 165 | func (r *jsonFrameReader) Close() error { |
| 166 | return r.r.Close() |
| 167 | } |