Matteo Scandolo | a428586 | 2020-12-01 18:10:10 -0800 | [diff] [blame^] | 1 | /* |
| 2 | Copyright 2014 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package yaml |
| 18 | |
| 19 | import ( |
| 20 | "bufio" |
| 21 | "bytes" |
| 22 | "encoding/json" |
| 23 | "fmt" |
| 24 | "io" |
| 25 | "io/ioutil" |
| 26 | "strings" |
| 27 | "unicode" |
| 28 | |
| 29 | "k8s.io/klog/v2" |
| 30 | "sigs.k8s.io/yaml" |
| 31 | ) |
| 32 | |
| 33 | // ToJSON converts a single YAML document into a JSON document |
| 34 | // or returns an error. If the document appears to be JSON the |
| 35 | // YAML decoding path is not used (so that error messages are |
| 36 | // JSON specific). |
| 37 | func ToJSON(data []byte) ([]byte, error) { |
| 38 | if hasJSONPrefix(data) { |
| 39 | return data, nil |
| 40 | } |
| 41 | return yaml.YAMLToJSON(data) |
| 42 | } |
| 43 | |
| 44 | // YAMLToJSONDecoder decodes YAML documents from an io.Reader by |
| 45 | // separating individual documents. It first converts the YAML |
| 46 | // body to JSON, then unmarshals the JSON. |
| 47 | type YAMLToJSONDecoder struct { |
| 48 | reader Reader |
| 49 | } |
| 50 | |
| 51 | // NewYAMLToJSONDecoder decodes YAML documents from the provided |
| 52 | // stream in chunks by converting each document (as defined by |
| 53 | // the YAML spec) into its own chunk, converting it to JSON via |
| 54 | // yaml.YAMLToJSON, and then passing it to json.Decoder. |
| 55 | func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder { |
| 56 | reader := bufio.NewReader(r) |
| 57 | return &YAMLToJSONDecoder{ |
| 58 | reader: NewYAMLReader(reader), |
| 59 | } |
| 60 | } |
| 61 | |
| 62 | // Decode reads a YAML document as JSON from the stream or returns |
| 63 | // an error. The decoding rules match json.Unmarshal, not |
| 64 | // yaml.Unmarshal. |
| 65 | func (d *YAMLToJSONDecoder) Decode(into interface{}) error { |
| 66 | bytes, err := d.reader.Read() |
| 67 | if err != nil && err != io.EOF { |
| 68 | return err |
| 69 | } |
| 70 | |
| 71 | if len(bytes) != 0 { |
| 72 | err := yaml.Unmarshal(bytes, into) |
| 73 | if err != nil { |
| 74 | return YAMLSyntaxError{err} |
| 75 | } |
| 76 | } |
| 77 | return err |
| 78 | } |
| 79 | |
| 80 | // YAMLDecoder reads chunks of objects and returns ErrShortBuffer if |
| 81 | // the data is not sufficient. |
| 82 | type YAMLDecoder struct { |
| 83 | r io.ReadCloser |
| 84 | scanner *bufio.Scanner |
| 85 | remaining []byte |
| 86 | } |
| 87 | |
| 88 | // NewDocumentDecoder decodes YAML documents from the provided |
| 89 | // stream in chunks by converting each document (as defined by |
| 90 | // the YAML spec) into its own chunk. io.ErrShortBuffer will be |
| 91 | // returned if the entire buffer could not be read to assist |
| 92 | // the caller in framing the chunk. |
| 93 | func NewDocumentDecoder(r io.ReadCloser) io.ReadCloser { |
| 94 | scanner := bufio.NewScanner(r) |
| 95 | // the size of initial allocation for buffer 4k |
| 96 | buf := make([]byte, 4*1024) |
| 97 | // the maximum size used to buffer a token 5M |
| 98 | scanner.Buffer(buf, 5*1024*1024) |
| 99 | scanner.Split(splitYAMLDocument) |
| 100 | return &YAMLDecoder{ |
| 101 | r: r, |
| 102 | scanner: scanner, |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | // Read reads the previous slice into the buffer, or attempts to read |
| 107 | // the next chunk. |
| 108 | // TODO: switch to readline approach. |
| 109 | func (d *YAMLDecoder) Read(data []byte) (n int, err error) { |
| 110 | left := len(d.remaining) |
| 111 | if left == 0 { |
| 112 | // return the next chunk from the stream |
| 113 | if !d.scanner.Scan() { |
| 114 | err := d.scanner.Err() |
| 115 | if err == nil { |
| 116 | err = io.EOF |
| 117 | } |
| 118 | return 0, err |
| 119 | } |
| 120 | out := d.scanner.Bytes() |
| 121 | d.remaining = out |
| 122 | left = len(out) |
| 123 | } |
| 124 | |
| 125 | // fits within data |
| 126 | if left <= len(data) { |
| 127 | copy(data, d.remaining) |
| 128 | d.remaining = nil |
| 129 | return left, nil |
| 130 | } |
| 131 | |
| 132 | // caller will need to reread |
| 133 | copy(data, d.remaining[:len(data)]) |
| 134 | d.remaining = d.remaining[len(data):] |
| 135 | return len(data), io.ErrShortBuffer |
| 136 | } |
| 137 | |
| 138 | func (d *YAMLDecoder) Close() error { |
| 139 | return d.r.Close() |
| 140 | } |
| 141 | |
| 142 | const yamlSeparator = "\n---" |
| 143 | const separator = "---" |
| 144 | |
| 145 | // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. |
| 146 | func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) { |
| 147 | if atEOF && len(data) == 0 { |
| 148 | return 0, nil, nil |
| 149 | } |
| 150 | sep := len([]byte(yamlSeparator)) |
| 151 | if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 { |
| 152 | // We have a potential document terminator |
| 153 | i += sep |
| 154 | after := data[i:] |
| 155 | if len(after) == 0 { |
| 156 | // we can't read any more characters |
| 157 | if atEOF { |
| 158 | return len(data), data[:len(data)-sep], nil |
| 159 | } |
| 160 | return 0, nil, nil |
| 161 | } |
| 162 | if j := bytes.IndexByte(after, '\n'); j >= 0 { |
| 163 | return i + j + 1, data[0 : i-sep], nil |
| 164 | } |
| 165 | return 0, nil, nil |
| 166 | } |
| 167 | // If we're at EOF, we have a final, non-terminated line. Return it. |
| 168 | if atEOF { |
| 169 | return len(data), data, nil |
| 170 | } |
| 171 | // Request more data. |
| 172 | return 0, nil, nil |
| 173 | } |
| 174 | |
| 175 | // decoder is a convenience interface for Decode. |
| 176 | type decoder interface { |
| 177 | Decode(into interface{}) error |
| 178 | } |
| 179 | |
| 180 | // YAMLOrJSONDecoder attempts to decode a stream of JSON documents or |
| 181 | // YAML documents by sniffing for a leading { character. |
| 182 | type YAMLOrJSONDecoder struct { |
| 183 | r io.Reader |
| 184 | bufferSize int |
| 185 | |
| 186 | decoder decoder |
| 187 | rawData []byte |
| 188 | } |
| 189 | |
| 190 | type JSONSyntaxError struct { |
| 191 | Line int |
| 192 | Err error |
| 193 | } |
| 194 | |
| 195 | func (e JSONSyntaxError) Error() string { |
| 196 | return fmt.Sprintf("json: line %d: %s", e.Line, e.Err.Error()) |
| 197 | } |
| 198 | |
| 199 | type YAMLSyntaxError struct { |
| 200 | err error |
| 201 | } |
| 202 | |
| 203 | func (e YAMLSyntaxError) Error() string { |
| 204 | return e.err.Error() |
| 205 | } |
| 206 | |
| 207 | // NewYAMLOrJSONDecoder returns a decoder that will process YAML documents |
| 208 | // or JSON documents from the given reader as a stream. bufferSize determines |
| 209 | // how far into the stream the decoder will look to figure out whether this |
| 210 | // is a JSON stream (has whitespace followed by an open brace). |
| 211 | func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder { |
| 212 | return &YAMLOrJSONDecoder{ |
| 213 | r: r, |
| 214 | bufferSize: bufferSize, |
| 215 | } |
| 216 | } |
| 217 | |
| 218 | // Decode unmarshals the next object from the underlying stream into the |
| 219 | // provide object, or returns an error. |
| 220 | func (d *YAMLOrJSONDecoder) Decode(into interface{}) error { |
| 221 | if d.decoder == nil { |
| 222 | buffer, origData, isJSON := GuessJSONStream(d.r, d.bufferSize) |
| 223 | if isJSON { |
| 224 | d.decoder = json.NewDecoder(buffer) |
| 225 | d.rawData = origData |
| 226 | } else { |
| 227 | d.decoder = NewYAMLToJSONDecoder(buffer) |
| 228 | } |
| 229 | } |
| 230 | err := d.decoder.Decode(into) |
| 231 | if jsonDecoder, ok := d.decoder.(*json.Decoder); ok { |
| 232 | if syntax, ok := err.(*json.SyntaxError); ok { |
| 233 | data, readErr := ioutil.ReadAll(jsonDecoder.Buffered()) |
| 234 | if readErr != nil { |
| 235 | klog.V(4).Infof("reading stream failed: %v", readErr) |
| 236 | } |
| 237 | js := string(data) |
| 238 | |
| 239 | // if contents from io.Reader are not complete, |
| 240 | // use the original raw data to prevent panic |
| 241 | if int64(len(js)) <= syntax.Offset { |
| 242 | js = string(d.rawData) |
| 243 | } |
| 244 | |
| 245 | start := strings.LastIndex(js[:syntax.Offset], "\n") + 1 |
| 246 | line := strings.Count(js[:start], "\n") |
| 247 | return JSONSyntaxError{ |
| 248 | Line: line, |
| 249 | Err: fmt.Errorf(syntax.Error()), |
| 250 | } |
| 251 | } |
| 252 | } |
| 253 | return err |
| 254 | } |
| 255 | |
| 256 | type Reader interface { |
| 257 | Read() ([]byte, error) |
| 258 | } |
| 259 | |
| 260 | type YAMLReader struct { |
| 261 | reader Reader |
| 262 | } |
| 263 | |
| 264 | func NewYAMLReader(r *bufio.Reader) *YAMLReader { |
| 265 | return &YAMLReader{ |
| 266 | reader: &LineReader{reader: r}, |
| 267 | } |
| 268 | } |
| 269 | |
| 270 | // Read returns a full YAML document. |
| 271 | func (r *YAMLReader) Read() ([]byte, error) { |
| 272 | var buffer bytes.Buffer |
| 273 | for { |
| 274 | line, err := r.reader.Read() |
| 275 | if err != nil && err != io.EOF { |
| 276 | return nil, err |
| 277 | } |
| 278 | |
| 279 | sep := len([]byte(separator)) |
| 280 | if i := bytes.Index(line, []byte(separator)); i == 0 { |
| 281 | // We have a potential document terminator |
| 282 | i += sep |
| 283 | after := line[i:] |
| 284 | if len(strings.TrimRightFunc(string(after), unicode.IsSpace)) == 0 { |
| 285 | if buffer.Len() != 0 { |
| 286 | return buffer.Bytes(), nil |
| 287 | } |
| 288 | if err == io.EOF { |
| 289 | return nil, err |
| 290 | } |
| 291 | } |
| 292 | } |
| 293 | if err == io.EOF { |
| 294 | if buffer.Len() != 0 { |
| 295 | // If we're at EOF, we have a final, non-terminated line. Return it. |
| 296 | return buffer.Bytes(), nil |
| 297 | } |
| 298 | return nil, err |
| 299 | } |
| 300 | buffer.Write(line) |
| 301 | } |
| 302 | } |
| 303 | |
| 304 | type LineReader struct { |
| 305 | reader *bufio.Reader |
| 306 | } |
| 307 | |
| 308 | // Read returns a single line (with '\n' ended) from the underlying reader. |
| 309 | // An error is returned iff there is an error with the underlying reader. |
| 310 | func (r *LineReader) Read() ([]byte, error) { |
| 311 | var ( |
| 312 | isPrefix bool = true |
| 313 | err error = nil |
| 314 | line []byte |
| 315 | buffer bytes.Buffer |
| 316 | ) |
| 317 | |
| 318 | for isPrefix && err == nil { |
| 319 | line, isPrefix, err = r.reader.ReadLine() |
| 320 | buffer.Write(line) |
| 321 | } |
| 322 | buffer.WriteByte('\n') |
| 323 | return buffer.Bytes(), err |
| 324 | } |
| 325 | |
| 326 | // GuessJSONStream scans the provided reader up to size, looking |
| 327 | // for an open brace indicating this is JSON. It will return the |
| 328 | // bufio.Reader it creates for the consumer. |
| 329 | func GuessJSONStream(r io.Reader, size int) (io.Reader, []byte, bool) { |
| 330 | buffer := bufio.NewReaderSize(r, size) |
| 331 | b, _ := buffer.Peek(size) |
| 332 | return buffer, b, hasJSONPrefix(b) |
| 333 | } |
| 334 | |
| 335 | var jsonPrefix = []byte("{") |
| 336 | |
| 337 | // hasJSONPrefix returns true if the provided buffer appears to start with |
| 338 | // a JSON open brace. |
| 339 | func hasJSONPrefix(buf []byte) bool { |
| 340 | return hasPrefix(buf, jsonPrefix) |
| 341 | } |
| 342 | |
| 343 | // Return true if the first non-whitespace bytes in buf is |
| 344 | // prefix. |
| 345 | func hasPrefix(buf []byte, prefix []byte) bool { |
| 346 | trim := bytes.TrimLeftFunc(buf, unicode.IsSpace) |
| 347 | return bytes.HasPrefix(trim, prefix) |
| 348 | } |