blob: 492171faf4baf98a2dc5c1720fc5e961f03c8cd0 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package yaml
18
19import (
20 "bufio"
21 "bytes"
22 "encoding/json"
23 "fmt"
24 "io"
25 "io/ioutil"
26 "strings"
27 "unicode"
28
29 "k8s.io/klog/v2"
30 "sigs.k8s.io/yaml"
31)
32
33// ToJSON converts a single YAML document into a JSON document
34// or returns an error. If the document appears to be JSON the
35// YAML decoding path is not used (so that error messages are
36// JSON specific).
37func ToJSON(data []byte) ([]byte, error) {
38 if hasJSONPrefix(data) {
39 return data, nil
40 }
41 return yaml.YAMLToJSON(data)
42}
43
44// YAMLToJSONDecoder decodes YAML documents from an io.Reader by
45// separating individual documents. It first converts the YAML
46// body to JSON, then unmarshals the JSON.
47type YAMLToJSONDecoder struct {
48 reader Reader
49}
50
51// NewYAMLToJSONDecoder decodes YAML documents from the provided
52// stream in chunks by converting each document (as defined by
53// the YAML spec) into its own chunk, converting it to JSON via
54// yaml.YAMLToJSON, and then passing it to json.Decoder.
55func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
56 reader := bufio.NewReader(r)
57 return &YAMLToJSONDecoder{
58 reader: NewYAMLReader(reader),
59 }
60}
61
62// Decode reads a YAML document as JSON from the stream or returns
63// an error. The decoding rules match json.Unmarshal, not
64// yaml.Unmarshal.
65func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
66 bytes, err := d.reader.Read()
67 if err != nil && err != io.EOF {
68 return err
69 }
70
71 if len(bytes) != 0 {
72 err := yaml.Unmarshal(bytes, into)
73 if err != nil {
74 return YAMLSyntaxError{err}
75 }
76 }
77 return err
78}
79
80// YAMLDecoder reads chunks of objects and returns ErrShortBuffer if
81// the data is not sufficient.
82type YAMLDecoder struct {
83 r io.ReadCloser
84 scanner *bufio.Scanner
85 remaining []byte
86}
87
88// NewDocumentDecoder decodes YAML documents from the provided
89// stream in chunks by converting each document (as defined by
90// the YAML spec) into its own chunk. io.ErrShortBuffer will be
91// returned if the entire buffer could not be read to assist
92// the caller in framing the chunk.
93func NewDocumentDecoder(r io.ReadCloser) io.ReadCloser {
94 scanner := bufio.NewScanner(r)
95 // the size of initial allocation for buffer 4k
96 buf := make([]byte, 4*1024)
97 // the maximum size used to buffer a token 5M
98 scanner.Buffer(buf, 5*1024*1024)
99 scanner.Split(splitYAMLDocument)
100 return &YAMLDecoder{
101 r: r,
102 scanner: scanner,
103 }
104}
105
106// Read reads the previous slice into the buffer, or attempts to read
107// the next chunk.
108// TODO: switch to readline approach.
109func (d *YAMLDecoder) Read(data []byte) (n int, err error) {
110 left := len(d.remaining)
111 if left == 0 {
112 // return the next chunk from the stream
113 if !d.scanner.Scan() {
114 err := d.scanner.Err()
115 if err == nil {
116 err = io.EOF
117 }
118 return 0, err
119 }
120 out := d.scanner.Bytes()
121 d.remaining = out
122 left = len(out)
123 }
124
125 // fits within data
126 if left <= len(data) {
127 copy(data, d.remaining)
128 d.remaining = nil
129 return left, nil
130 }
131
132 // caller will need to reread
133 copy(data, d.remaining[:len(data)])
134 d.remaining = d.remaining[len(data):]
135 return len(data), io.ErrShortBuffer
136}
137
138func (d *YAMLDecoder) Close() error {
139 return d.r.Close()
140}
141
142const yamlSeparator = "\n---"
143const separator = "---"
144
145// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
146func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
147 if atEOF && len(data) == 0 {
148 return 0, nil, nil
149 }
150 sep := len([]byte(yamlSeparator))
151 if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
152 // We have a potential document terminator
153 i += sep
154 after := data[i:]
155 if len(after) == 0 {
156 // we can't read any more characters
157 if atEOF {
158 return len(data), data[:len(data)-sep], nil
159 }
160 return 0, nil, nil
161 }
162 if j := bytes.IndexByte(after, '\n'); j >= 0 {
163 return i + j + 1, data[0 : i-sep], nil
164 }
165 return 0, nil, nil
166 }
167 // If we're at EOF, we have a final, non-terminated line. Return it.
168 if atEOF {
169 return len(data), data, nil
170 }
171 // Request more data.
172 return 0, nil, nil
173}
174
175// decoder is a convenience interface for Decode.
176type decoder interface {
177 Decode(into interface{}) error
178}
179
180// YAMLOrJSONDecoder attempts to decode a stream of JSON documents or
181// YAML documents by sniffing for a leading { character.
182type YAMLOrJSONDecoder struct {
183 r io.Reader
184 bufferSize int
185
186 decoder decoder
187 rawData []byte
188}
189
190type JSONSyntaxError struct {
191 Line int
192 Err error
193}
194
195func (e JSONSyntaxError) Error() string {
196 return fmt.Sprintf("json: line %d: %s", e.Line, e.Err.Error())
197}
198
199type YAMLSyntaxError struct {
200 err error
201}
202
203func (e YAMLSyntaxError) Error() string {
204 return e.err.Error()
205}
206
207// NewYAMLOrJSONDecoder returns a decoder that will process YAML documents
208// or JSON documents from the given reader as a stream. bufferSize determines
209// how far into the stream the decoder will look to figure out whether this
210// is a JSON stream (has whitespace followed by an open brace).
211func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder {
212 return &YAMLOrJSONDecoder{
213 r: r,
214 bufferSize: bufferSize,
215 }
216}
217
218// Decode unmarshals the next object from the underlying stream into the
219// provide object, or returns an error.
220func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
221 if d.decoder == nil {
222 buffer, origData, isJSON := GuessJSONStream(d.r, d.bufferSize)
223 if isJSON {
224 d.decoder = json.NewDecoder(buffer)
225 d.rawData = origData
226 } else {
227 d.decoder = NewYAMLToJSONDecoder(buffer)
228 }
229 }
230 err := d.decoder.Decode(into)
231 if jsonDecoder, ok := d.decoder.(*json.Decoder); ok {
232 if syntax, ok := err.(*json.SyntaxError); ok {
233 data, readErr := ioutil.ReadAll(jsonDecoder.Buffered())
234 if readErr != nil {
235 klog.V(4).Infof("reading stream failed: %v", readErr)
236 }
237 js := string(data)
238
239 // if contents from io.Reader are not complete,
240 // use the original raw data to prevent panic
241 if int64(len(js)) <= syntax.Offset {
242 js = string(d.rawData)
243 }
244
245 start := strings.LastIndex(js[:syntax.Offset], "\n") + 1
246 line := strings.Count(js[:start], "\n")
247 return JSONSyntaxError{
248 Line: line,
249 Err: fmt.Errorf(syntax.Error()),
250 }
251 }
252 }
253 return err
254}
255
256type Reader interface {
257 Read() ([]byte, error)
258}
259
260type YAMLReader struct {
261 reader Reader
262}
263
264func NewYAMLReader(r *bufio.Reader) *YAMLReader {
265 return &YAMLReader{
266 reader: &LineReader{reader: r},
267 }
268}
269
270// Read returns a full YAML document.
271func (r *YAMLReader) Read() ([]byte, error) {
272 var buffer bytes.Buffer
273 for {
274 line, err := r.reader.Read()
275 if err != nil && err != io.EOF {
276 return nil, err
277 }
278
279 sep := len([]byte(separator))
280 if i := bytes.Index(line, []byte(separator)); i == 0 {
281 // We have a potential document terminator
282 i += sep
283 after := line[i:]
284 if len(strings.TrimRightFunc(string(after), unicode.IsSpace)) == 0 {
285 if buffer.Len() != 0 {
286 return buffer.Bytes(), nil
287 }
288 if err == io.EOF {
289 return nil, err
290 }
291 }
292 }
293 if err == io.EOF {
294 if buffer.Len() != 0 {
295 // If we're at EOF, we have a final, non-terminated line. Return it.
296 return buffer.Bytes(), nil
297 }
298 return nil, err
299 }
300 buffer.Write(line)
301 }
302}
303
304type LineReader struct {
305 reader *bufio.Reader
306}
307
308// Read returns a single line (with '\n' ended) from the underlying reader.
309// An error is returned iff there is an error with the underlying reader.
310func (r *LineReader) Read() ([]byte, error) {
311 var (
312 isPrefix bool = true
313 err error = nil
314 line []byte
315 buffer bytes.Buffer
316 )
317
318 for isPrefix && err == nil {
319 line, isPrefix, err = r.reader.ReadLine()
320 buffer.Write(line)
321 }
322 buffer.WriteByte('\n')
323 return buffer.Bytes(), err
324}
325
326// GuessJSONStream scans the provided reader up to size, looking
327// for an open brace indicating this is JSON. It will return the
328// bufio.Reader it creates for the consumer.
329func GuessJSONStream(r io.Reader, size int) (io.Reader, []byte, bool) {
330 buffer := bufio.NewReaderSize(r, size)
331 b, _ := buffer.Peek(size)
332 return buffer, b, hasJSONPrefix(b)
333}
334
335var jsonPrefix = []byte("{")
336
337// hasJSONPrefix returns true if the provided buffer appears to start with
338// a JSON open brace.
339func hasJSONPrefix(buf []byte) bool {
340 return hasPrefix(buf, jsonPrefix)
341}
342
343// Return true if the first non-whitespace bytes in buf is
344// prefix.
345func hasPrefix(buf []byte, prefix []byte) bool {
346 trim := bytes.TrimLeftFunc(buf, unicode.IsSpace)
347 return bytes.HasPrefix(trim, prefix)
348}