blob: 066680f443e53c2ff44202e844db66014774dd9d [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// Package framer implements simple frame decoding techniques for an io.ReadCloser
18package framer
19
20import (
21 "encoding/binary"
22 "encoding/json"
23 "io"
24)
25
26type lengthDelimitedFrameWriter struct {
27 w io.Writer
28 h [4]byte
29}
30
31func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer {
32 return &lengthDelimitedFrameWriter{w: w}
33}
34
35// Write writes a single frame to the nested writer, prepending it with the length in
36// in bytes of data (as a 4 byte, bigendian uint32).
37func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) {
38 binary.BigEndian.PutUint32(w.h[:], uint32(len(data)))
39 n, err := w.w.Write(w.h[:])
40 if err != nil {
41 return 0, err
42 }
43 if n != len(w.h) {
44 return 0, io.ErrShortWrite
45 }
46 return w.w.Write(data)
47}
48
49type lengthDelimitedFrameReader struct {
50 r io.ReadCloser
51 remaining int
52}
53
54// NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed
55// frames off of a stream.
56//
57// The protocol is:
58//
59// stream: message ...
60// message: prefix body
61// prefix: 4 byte uint32 in BigEndian order, denotes length of body
62// body: bytes (0..prefix)
63//
64// If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead
65// will be returned along with the number of bytes read.
66func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser {
67 return &lengthDelimitedFrameReader{r: r}
68}
69
70// Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer
71// is returned and subsequent calls will attempt to read the last frame. A frame is complete when
72// err is nil.
73func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) {
74 if r.remaining <= 0 {
75 header := [4]byte{}
76 n, err := io.ReadAtLeast(r.r, header[:4], 4)
77 if err != nil {
78 return 0, err
79 }
80 if n != 4 {
81 return 0, io.ErrUnexpectedEOF
82 }
83 frameLength := int(binary.BigEndian.Uint32(header[:]))
84 r.remaining = frameLength
85 }
86
87 expect := r.remaining
88 max := expect
89 if max > len(data) {
90 max = len(data)
91 }
92 n, err := io.ReadAtLeast(r.r, data[:max], int(max))
93 r.remaining -= n
94 if err == io.ErrShortBuffer || r.remaining > 0 {
95 return n, io.ErrShortBuffer
96 }
97 if err != nil {
98 return n, err
99 }
100 if n != expect {
101 return n, io.ErrUnexpectedEOF
102 }
103
104 return n, nil
105}
106
107func (r *lengthDelimitedFrameReader) Close() error {
108 return r.r.Close()
109}
110
111type jsonFrameReader struct {
112 r io.ReadCloser
113 decoder *json.Decoder
114 remaining []byte
115}
116
117// NewJSONFramedReader returns an io.Reader that will decode individual JSON objects off
118// of a wire.
119//
120// The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate
121// the read.
122func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser {
123 return &jsonFrameReader{
124 r: r,
125 decoder: json.NewDecoder(r),
126 }
127}
128
129// ReadFrame decodes the next JSON object in the stream, or returns an error. The returned
130// byte slice will be modified the next time ReadFrame is invoked and should not be altered.
131func (r *jsonFrameReader) Read(data []byte) (int, error) {
132 // Return whatever remaining data exists from an in progress frame
133 if n := len(r.remaining); n > 0 {
134 if n <= len(data) {
135 data = append(data[0:0], r.remaining...)
136 r.remaining = nil
137 return n, nil
138 }
139
140 n = len(data)
141 data = append(data[0:0], r.remaining[:n]...)
142 r.remaining = r.remaining[n:]
143 return n, io.ErrShortBuffer
144 }
145
146 // RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see
147 // data written to data, or be larger than data and a different array.
148 n := len(data)
149 m := json.RawMessage(data[:0])
150 if err := r.decoder.Decode(&m); err != nil {
151 return 0, err
152 }
153
154 // If capacity of data is less than length of the message, decoder will allocate a new slice
155 // and set m to it, which means we need to copy the partial result back into data and preserve
156 // the remaining result for subsequent reads.
157 if len(m) > n {
158 data = append(data[0:0], m[:n]...)
159 r.remaining = m[n:]
160 return n, io.ErrShortBuffer
161 }
162 return len(m), nil
163}
164
165func (r *jsonFrameReader) Close() error {
166 return r.r.Close()
167}