blob: 098f9bf99f7948d8a8e2b87e5af77c6dd9899a40 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001// Copyright 2019+ Klaus Post. All rights reserved.
2// License information can be found in the LICENSE file.
3// Based on work by Yann Collet, released under BSD License.
4
5package zstd
6
7import (
8 "bytes"
9 "errors"
10 "io"
11 "sync"
12)
13
14// Decoder provides decoding of zstandard streams.
15// The decoder has been designed to operate without allocations after a warmup.
16// This means that you should store the decoder for best performance.
17// To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
18// A decoder can safely be re-used even if the previous stream failed.
19// To release the resources, you must call the Close() function on a decoder.
20type Decoder struct {
21 o decoderOptions
22
23 // Unreferenced decoders, ready for use.
24 decoders chan *blockDec
25
26 // Unreferenced decoders, ready for use.
27 frames chan *frameDec
28
29 // Streams ready to be decoded.
30 stream chan decodeStream
31
32 // Current read position used for Reader functionality.
33 current decoderState
34
35 // Custom dictionaries
36 dicts map[uint32]struct{}
37
38 // streamWg is the waitgroup for all streams
39 streamWg sync.WaitGroup
40}
41
42// decoderState is used for maintaining state when the decoder
43// is used for streaming.
44type decoderState struct {
45 // current block being written to stream.
46 decodeOutput
47
48 // output in order to be written to stream.
49 output chan decodeOutput
50
51 // cancel remaining output.
52 cancel chan struct{}
53
54 flushed bool
55}
56
57var (
58 // Check the interfaces we want to support.
59 _ = io.WriterTo(&Decoder{})
60 _ = io.Reader(&Decoder{})
61)
62
63// NewReader creates a new decoder.
64// A nil Reader can be provided in which case Reset can be used to start a decode.
65//
66// A Decoder can be used in two modes:
67//
68// 1) As a stream, or
69// 2) For stateless decoding using DecodeAll or DecodeBuffer.
70//
71// Only a single stream can be decoded concurrently, but the same decoder
72// can run multiple concurrent stateless decodes. It is even possible to
73// use stateless decodes while a stream is being decoded.
74//
75// The Reset function can be used to initiate a new stream, which is will considerably
76// reduce the allocations normally caused by NewReader.
77func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
78 initPredefined()
79 var d Decoder
80 d.o.setDefault()
81 for _, o := range opts {
82 err := o(&d.o)
83 if err != nil {
84 return nil, err
85 }
86 }
87 d.current.output = make(chan decodeOutput, d.o.concurrent)
88 d.current.flushed = true
89
90 // Create decoders
91 d.decoders = make(chan *blockDec, d.o.concurrent)
92 d.frames = make(chan *frameDec, d.o.concurrent)
93 for i := 0; i < d.o.concurrent; i++ {
94 d.frames <- newFrameDec(d.o)
95 d.decoders <- newBlockDec(d.o.lowMem)
96 }
97
98 if r == nil {
99 return &d, nil
100 }
101 return &d, d.Reset(r)
102}
103
104// Read bytes from the decompressed stream into p.
105// Returns the number of bytes written and any error that occurred.
106// When the stream is done, io.EOF will be returned.
107func (d *Decoder) Read(p []byte) (int, error) {
108 if d.stream == nil {
109 return 0, errors.New("no input has been initialized")
110 }
111 var n int
112 for {
113 if len(d.current.b) > 0 {
114 filled := copy(p, d.current.b)
115 p = p[filled:]
116 d.current.b = d.current.b[filled:]
117 n += filled
118 }
119 if len(p) == 0 {
120 break
121 }
122 if len(d.current.b) == 0 {
123 // We have an error and no more data
124 if d.current.err != nil {
125 break
126 }
127 d.nextBlock()
128 }
129 }
130 if len(d.current.b) > 0 {
131 if debug {
132 println("returning", n, "still bytes left:", len(d.current.b))
133 }
134 // Only return error at end of block
135 return n, nil
136 }
137 if d.current.err != nil {
138 d.drainOutput()
139 }
140 if debug {
141 println("returning", n, d.current.err, len(d.decoders))
142 }
143 return n, d.current.err
144}
145
146// Reset will reset the decoder the supplied stream after the current has finished processing.
147// Note that this functionality cannot be used after Close has been called.
148func (d *Decoder) Reset(r io.Reader) error {
149 if d.current.err == ErrDecoderClosed {
150 return d.current.err
151 }
152 if r == nil {
153 return errors.New("nil Reader sent as input")
154 }
155
156 if d.stream == nil {
157 d.stream = make(chan decodeStream, 1)
158 d.streamWg.Add(1)
159 go d.startStreamDecoder(d.stream)
160 }
161
162 d.drainOutput()
163
164 // If bytes buffer and < 1MB, do sync decoding anyway.
165 if bb, ok := r.(*bytes.Buffer); ok && bb.Len() < 1<<20 {
166 if debug {
167 println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
168 }
169 b := bb.Bytes()
170 dst, err := d.DecodeAll(b, nil)
171 if err == nil {
172 err = io.EOF
173 }
174 d.current.b = dst
175 d.current.err = err
176 d.current.flushed = true
177 if debug {
178 println("sync decode to ", len(dst), "bytes, err:", err)
179 }
180 return nil
181 }
182
183 // Remove current block.
184 d.current.decodeOutput = decodeOutput{}
185 d.current.err = nil
186 d.current.cancel = make(chan struct{})
187 d.current.flushed = false
188 d.current.d = nil
189
190 d.stream <- decodeStream{
191 r: r,
192 output: d.current.output,
193 cancel: d.current.cancel,
194 }
195 return nil
196}
197
198// drainOutput will drain the output until errEndOfStream is sent.
199func (d *Decoder) drainOutput() {
200 if d.current.cancel != nil {
201 println("cancelling current")
202 close(d.current.cancel)
203 d.current.cancel = nil
204 }
205 if d.current.d != nil {
206 if debug {
207 printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
208 }
209 d.decoders <- d.current.d
210 d.current.d = nil
211 d.current.b = nil
212 }
213 if d.current.output == nil || d.current.flushed {
214 println("current already flushed")
215 return
216 }
217 for {
218 select {
219 case v := <-d.current.output:
220 if v.d != nil {
221 if debug {
222 printf("re-adding decoder %p", v.d)
223 }
224 d.decoders <- v.d
225 }
226 if v.err == errEndOfStream {
227 println("current flushed")
228 d.current.flushed = true
229 return
230 }
231 }
232 }
233}
234
235// WriteTo writes data to w until there's no more data to write or when an error occurs.
236// The return value n is the number of bytes written.
237// Any error encountered during the write is also returned.
238func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
239 if d.stream == nil {
240 return 0, errors.New("no input has been initialized")
241 }
242 var n int64
243 for {
244 if len(d.current.b) > 0 {
245 n2, err2 := w.Write(d.current.b)
246 n += int64(n2)
247 if err2 != nil && d.current.err == nil {
248 d.current.err = err2
249 break
250 }
251 }
252 if d.current.err != nil {
253 break
254 }
255 d.nextBlock()
256 }
257 err := d.current.err
258 if err != nil {
259 d.drainOutput()
260 }
261 if err == io.EOF {
262 err = nil
263 }
264 return n, err
265}
266
267// DecodeAll allows stateless decoding of a blob of bytes.
268// Output will be appended to dst, so if the destination size is known
269// you can pre-allocate the destination slice to avoid allocations.
270// DecodeAll can be used concurrently.
271// The Decoder concurrency limits will be respected.
272func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
273 if d.current.err == ErrDecoderClosed {
274 return dst, ErrDecoderClosed
275 }
276
277 // Grab a block decoder and frame decoder.
278 block, frame := <-d.decoders, <-d.frames
279 defer func() {
280 if debug {
281 printf("re-adding decoder: %p", block)
282 }
283 d.decoders <- block
284 frame.rawInput = nil
285 frame.bBuf = nil
286 d.frames <- frame
287 }()
288 frame.bBuf = input
289
290 for {
291 err := frame.reset(&frame.bBuf)
292 if err == io.EOF {
293 return dst, nil
294 }
295 if err != nil {
296 return dst, err
297 }
298 if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
299 return dst, ErrDecoderSizeExceeded
300 }
301 if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
302 // Never preallocate moe than 1 GB up front.
303 if uint64(cap(dst)) < frame.FrameContentSize {
304 dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
305 copy(dst2, dst)
306 dst = dst2
307 }
308 }
309 if cap(dst) == 0 {
310 // Allocate window size * 2 by default if nothing is provided and we didn't get frame content size.
311 size := frame.WindowSize * 2
312 // Cap to 1 MB.
313 if size > 1<<20 {
314 size = 1 << 20
315 }
316 dst = make([]byte, 0, frame.WindowSize)
317 }
318
319 dst, err = frame.runDecoder(dst, block)
320 if err != nil {
321 return dst, err
322 }
323 if len(frame.bBuf) == 0 {
324 break
325 }
326 }
327 return dst, nil
328}
329
330// nextBlock returns the next block.
331// If an error occurs d.err will be set.
332func (d *Decoder) nextBlock() {
333 if d.current.d != nil {
334 if debug {
335 printf("re-adding current decoder %p", d.current.d)
336 }
337 d.decoders <- d.current.d
338 d.current.d = nil
339 }
340 if d.current.err != nil {
341 // Keep error state.
342 return
343 }
344 d.current.decodeOutput = <-d.current.output
345 if debug {
346 println("got", len(d.current.b), "bytes, error:", d.current.err)
347 }
348}
349
350// Close will release all resources.
351// It is NOT possible to reuse the decoder after this.
352func (d *Decoder) Close() {
353 if d.current.err == ErrDecoderClosed {
354 return
355 }
356 d.drainOutput()
357 if d.stream != nil {
358 close(d.stream)
359 d.streamWg.Wait()
360 d.stream = nil
361 }
362 if d.decoders != nil {
363 close(d.decoders)
364 for dec := range d.decoders {
365 dec.Close()
366 }
367 d.decoders = nil
368 }
369 if d.current.d != nil {
370 d.current.d.Close()
371 d.current.d = nil
372 }
373 d.current.err = ErrDecoderClosed
374}
375
376type decodeOutput struct {
377 d *blockDec
378 b []byte
379 err error
380}
381
382type decodeStream struct {
383 r io.Reader
384
385 // Blocks ready to be written to output.
386 output chan decodeOutput
387
388 // cancel reading from the input
389 cancel chan struct{}
390}
391
392// errEndOfStream indicates that everything from the stream was read.
393var errEndOfStream = errors.New("end-of-stream")
394
395// Create Decoder:
396// Spawn n block decoders. These accept tasks to decode a block.
397// Create goroutine that handles stream processing, this will send history to decoders as they are available.
398// Decoders update the history as they decode.
399// When a block is returned:
400// a) history is sent to the next decoder,
401// b) content written to CRC.
402// c) return data to WRITER.
403// d) wait for next block to return data.
404// Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
405func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
406 defer d.streamWg.Done()
407 frame := newFrameDec(d.o)
408 for stream := range inStream {
409 if debug {
410 println("got new stream")
411 }
412 br := readerWrapper{r: stream.r}
413 decodeStream:
414 for {
415 err := frame.reset(&br)
416 if debug && err != nil {
417 println("Frame decoder returned", err)
418 }
419 if err != nil {
420 stream.output <- decodeOutput{
421 err: err,
422 }
423 break
424 }
425 if debug {
426 println("starting frame decoder")
427 }
428
429 // This goroutine will forward history between frames.
430 frame.frameDone.Add(1)
431 frame.initAsync()
432
433 go frame.startDecoder(stream.output)
434 decodeFrame:
435 // Go through all blocks of the frame.
436 for {
437 dec := <-d.decoders
438 select {
439 case <-stream.cancel:
440 if !frame.sendErr(dec, io.EOF) {
441 // To not let the decoder dangle, send it back.
442 stream.output <- decodeOutput{d: dec}
443 }
444 break decodeStream
445 default:
446 }
447 err := frame.next(dec)
448 switch err {
449 case io.EOF:
450 // End of current frame, no error
451 println("EOF on next block")
452 break decodeFrame
453 case nil:
454 continue
455 default:
456 println("block decoder returned", err)
457 break decodeStream
458 }
459 }
460 // All blocks have started decoding, check if there are more frames.
461 println("waiting for done")
462 frame.frameDone.Wait()
463 println("done waiting...")
464 }
465 frame.frameDone.Wait()
466 println("Sending EOS")
467 stream.output <- decodeOutput{err: errEndOfStream}
468 }
469}