blob: f430f58b5726c8877593b0a23e150f223562770e [file] [log] [blame]
kesavandc71914f2022-03-25 11:19:03 +05301// Copyright 2019+ Klaus Post. All rights reserved.
2// License information can be found in the LICENSE file.
3// Based on work by Yann Collet, released under BSD License.
4
5package zstd
6
7import (
8 "errors"
9 "io"
10 "sync"
11)
12
13// Decoder provides decoding of zstandard streams.
14// The decoder has been designed to operate without allocations after a warmup.
15// This means that you should store the decoder for best performance.
16// To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
17// A decoder can safely be re-used even if the previous stream failed.
18// To release the resources, you must call the Close() function on a decoder.
19type Decoder struct {
20 o decoderOptions
21
22 // Unreferenced decoders, ready for use.
23 decoders chan *blockDec
24
25 // Streams ready to be decoded.
26 stream chan decodeStream
27
28 // Current read position used for Reader functionality.
29 current decoderState
30
31 // Custom dictionaries.
32 // Always uses copies.
33 dicts map[uint32]dict
34
35 // streamWg is the waitgroup for all streams
36 streamWg sync.WaitGroup
37}
38
39// decoderState is used for maintaining state when the decoder
40// is used for streaming.
41type decoderState struct {
42 // current block being written to stream.
43 decodeOutput
44
45 // output in order to be written to stream.
46 output chan decodeOutput
47
48 // cancel remaining output.
49 cancel chan struct{}
50
51 flushed bool
52}
53
54var (
55 // Check the interfaces we want to support.
56 _ = io.WriterTo(&Decoder{})
57 _ = io.Reader(&Decoder{})
58)
59
60// NewReader creates a new decoder.
61// A nil Reader can be provided in which case Reset can be used to start a decode.
62//
63// A Decoder can be used in two modes:
64//
65// 1) As a stream, or
66// 2) For stateless decoding using DecodeAll.
67//
68// Only a single stream can be decoded concurrently, but the same decoder
69// can run multiple concurrent stateless decodes. It is even possible to
70// use stateless decodes while a stream is being decoded.
71//
72// The Reset function can be used to initiate a new stream, which is will considerably
73// reduce the allocations normally caused by NewReader.
74func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
75 initPredefined()
76 var d Decoder
77 d.o.setDefault()
78 for _, o := range opts {
79 err := o(&d.o)
80 if err != nil {
81 return nil, err
82 }
83 }
84 d.current.output = make(chan decodeOutput, d.o.concurrent)
85 d.current.flushed = true
86
87 if r == nil {
88 d.current.err = ErrDecoderNilInput
89 }
90
91 // Transfer option dicts.
92 d.dicts = make(map[uint32]dict, len(d.o.dicts))
93 for _, dc := range d.o.dicts {
94 d.dicts[dc.id] = dc
95 }
96 d.o.dicts = nil
97
98 // Create decoders
99 d.decoders = make(chan *blockDec, d.o.concurrent)
100 for i := 0; i < d.o.concurrent; i++ {
101 dec := newBlockDec(d.o.lowMem)
102 dec.localFrame = newFrameDec(d.o)
103 d.decoders <- dec
104 }
105
106 if r == nil {
107 return &d, nil
108 }
109 return &d, d.Reset(r)
110}
111
112// Read bytes from the decompressed stream into p.
113// Returns the number of bytes written and any error that occurred.
114// When the stream is done, io.EOF will be returned.
115func (d *Decoder) Read(p []byte) (int, error) {
116 var n int
117 for {
118 if len(d.current.b) > 0 {
119 filled := copy(p, d.current.b)
120 p = p[filled:]
121 d.current.b = d.current.b[filled:]
122 n += filled
123 }
124 if len(p) == 0 {
125 break
126 }
127 if len(d.current.b) == 0 {
128 // We have an error and no more data
129 if d.current.err != nil {
130 break
131 }
132 if !d.nextBlock(n == 0) {
133 return n, nil
134 }
135 }
136 }
137 if len(d.current.b) > 0 {
138 if debugDecoder {
139 println("returning", n, "still bytes left:", len(d.current.b))
140 }
141 // Only return error at end of block
142 return n, nil
143 }
144 if d.current.err != nil {
145 d.drainOutput()
146 }
147 if debugDecoder {
148 println("returning", n, d.current.err, len(d.decoders))
149 }
150 return n, d.current.err
151}
152
153// Reset will reset the decoder the supplied stream after the current has finished processing.
154// Note that this functionality cannot be used after Close has been called.
155// Reset can be called with a nil reader to release references to the previous reader.
156// After being called with a nil reader, no other operations than Reset or DecodeAll or Close
157// should be used.
158func (d *Decoder) Reset(r io.Reader) error {
159 if d.current.err == ErrDecoderClosed {
160 return d.current.err
161 }
162
163 d.drainOutput()
164
165 if r == nil {
166 d.current.err = ErrDecoderNilInput
167 if len(d.current.b) > 0 {
168 d.current.b = d.current.b[:0]
169 }
170 d.current.flushed = true
171 return nil
172 }
173
174 // If bytes buffer and < 5MB, do sync decoding anyway.
175 if bb, ok := r.(byter); ok && bb.Len() < 5<<20 {
176 bb2 := bb
177 if debugDecoder {
178 println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
179 }
180 b := bb2.Bytes()
181 var dst []byte
182 if cap(d.current.b) > 0 {
183 dst = d.current.b
184 }
185
186 dst, err := d.DecodeAll(b, dst[:0])
187 if err == nil {
188 err = io.EOF
189 }
190 d.current.b = dst
191 d.current.err = err
192 d.current.flushed = true
193 if debugDecoder {
194 println("sync decode to", len(dst), "bytes, err:", err)
195 }
196 return nil
197 }
198
199 if d.stream == nil {
200 d.stream = make(chan decodeStream, 1)
201 d.streamWg.Add(1)
202 go d.startStreamDecoder(d.stream)
203 }
204
205 // Remove current block.
206 d.current.decodeOutput = decodeOutput{}
207 d.current.err = nil
208 d.current.cancel = make(chan struct{})
209 d.current.flushed = false
210 d.current.d = nil
211
212 d.stream <- decodeStream{
213 r: r,
214 output: d.current.output,
215 cancel: d.current.cancel,
216 }
217 return nil
218}
219
220// drainOutput will drain the output until errEndOfStream is sent.
221func (d *Decoder) drainOutput() {
222 if d.current.cancel != nil {
223 println("cancelling current")
224 close(d.current.cancel)
225 d.current.cancel = nil
226 }
227 if d.current.d != nil {
228 if debugDecoder {
229 printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
230 }
231 d.decoders <- d.current.d
232 d.current.d = nil
233 d.current.b = nil
234 }
235 if d.current.output == nil || d.current.flushed {
236 println("current already flushed")
237 return
238 }
239 for v := range d.current.output {
240 if v.d != nil {
241 if debugDecoder {
242 printf("re-adding decoder %p", v.d)
243 }
244 d.decoders <- v.d
245 }
246 if v.err == errEndOfStream {
247 println("current flushed")
248 d.current.flushed = true
249 return
250 }
251 }
252}
253
254// WriteTo writes data to w until there's no more data to write or when an error occurs.
255// The return value n is the number of bytes written.
256// Any error encountered during the write is also returned.
257func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
258 var n int64
259 for {
260 if len(d.current.b) > 0 {
261 n2, err2 := w.Write(d.current.b)
262 n += int64(n2)
263 if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
264 d.current.err = err2
265 } else if n2 != len(d.current.b) {
266 d.current.err = io.ErrShortWrite
267 }
268 }
269 if d.current.err != nil {
270 break
271 }
272 d.nextBlock(true)
273 }
274 err := d.current.err
275 if err != nil {
276 d.drainOutput()
277 }
278 if err == io.EOF {
279 err = nil
280 }
281 return n, err
282}
283
284// DecodeAll allows stateless decoding of a blob of bytes.
285// Output will be appended to dst, so if the destination size is known
286// you can pre-allocate the destination slice to avoid allocations.
287// DecodeAll can be used concurrently.
288// The Decoder concurrency limits will be respected.
289func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
290 if d.current.err == ErrDecoderClosed {
291 return dst, ErrDecoderClosed
292 }
293
294 // Grab a block decoder and frame decoder.
295 block := <-d.decoders
296 frame := block.localFrame
297 defer func() {
298 if debugDecoder {
299 printf("re-adding decoder: %p", block)
300 }
301 frame.rawInput = nil
302 frame.bBuf = nil
303 d.decoders <- block
304 }()
305 frame.bBuf = input
306
307 for {
308 frame.history.reset()
309 err := frame.reset(&frame.bBuf)
310 if err == io.EOF {
311 if debugDecoder {
312 println("frame reset return EOF")
313 }
314 return dst, nil
315 }
316 if frame.DictionaryID != nil {
317 dict, ok := d.dicts[*frame.DictionaryID]
318 if !ok {
319 return nil, ErrUnknownDictionary
320 }
321 frame.history.setDict(&dict)
322 }
323 if err != nil {
324 return dst, err
325 }
326 if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
327 return dst, ErrDecoderSizeExceeded
328 }
329 if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
330 // Never preallocate moe than 1 GB up front.
331 if cap(dst)-len(dst) < int(frame.FrameContentSize) {
332 dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
333 copy(dst2, dst)
334 dst = dst2
335 }
336 }
337 if cap(dst) == 0 {
338 // Allocate len(input) * 2 by default if nothing is provided
339 // and we didn't get frame content size.
340 size := len(input) * 2
341 // Cap to 1 MB.
342 if size > 1<<20 {
343 size = 1 << 20
344 }
345 if uint64(size) > d.o.maxDecodedSize {
346 size = int(d.o.maxDecodedSize)
347 }
348 dst = make([]byte, 0, size)
349 }
350
351 dst, err = frame.runDecoder(dst, block)
352 if err != nil {
353 return dst, err
354 }
355 if len(frame.bBuf) == 0 {
356 if debugDecoder {
357 println("frame dbuf empty")
358 }
359 break
360 }
361 }
362 return dst, nil
363}
364
365// nextBlock returns the next block.
366// If an error occurs d.err will be set.
367// Optionally the function can block for new output.
368// If non-blocking mode is used the returned boolean will be false
369// if no data was available without blocking.
370func (d *Decoder) nextBlock(blocking bool) (ok bool) {
371 if d.current.d != nil {
372 if debugDecoder {
373 printf("re-adding current decoder %p", d.current.d)
374 }
375 d.decoders <- d.current.d
376 d.current.d = nil
377 }
378 if d.current.err != nil {
379 // Keep error state.
380 return blocking
381 }
382
383 if blocking {
384 d.current.decodeOutput = <-d.current.output
385 } else {
386 select {
387 case d.current.decodeOutput = <-d.current.output:
388 default:
389 return false
390 }
391 }
392 if debugDecoder {
393 println("got", len(d.current.b), "bytes, error:", d.current.err)
394 }
395 return true
396}
397
398// Close will release all resources.
399// It is NOT possible to reuse the decoder after this.
400func (d *Decoder) Close() {
401 if d.current.err == ErrDecoderClosed {
402 return
403 }
404 d.drainOutput()
405 if d.stream != nil {
406 close(d.stream)
407 d.streamWg.Wait()
408 d.stream = nil
409 }
410 if d.decoders != nil {
411 close(d.decoders)
412 for dec := range d.decoders {
413 dec.Close()
414 }
415 d.decoders = nil
416 }
417 if d.current.d != nil {
418 d.current.d.Close()
419 d.current.d = nil
420 }
421 d.current.err = ErrDecoderClosed
422}
423
424// IOReadCloser returns the decoder as an io.ReadCloser for convenience.
425// Any changes to the decoder will be reflected, so the returned ReadCloser
426// can be reused along with the decoder.
427// io.WriterTo is also supported by the returned ReadCloser.
428func (d *Decoder) IOReadCloser() io.ReadCloser {
429 return closeWrapper{d: d}
430}
431
432// closeWrapper wraps a function call as a closer.
433type closeWrapper struct {
434 d *Decoder
435}
436
437// WriteTo forwards WriteTo calls to the decoder.
438func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
439 return c.d.WriteTo(w)
440}
441
442// Read forwards read calls to the decoder.
443func (c closeWrapper) Read(p []byte) (n int, err error) {
444 return c.d.Read(p)
445}
446
447// Close closes the decoder.
448func (c closeWrapper) Close() error {
449 c.d.Close()
450 return nil
451}
452
453type decodeOutput struct {
454 d *blockDec
455 b []byte
456 err error
457}
458
459type decodeStream struct {
460 r io.Reader
461
462 // Blocks ready to be written to output.
463 output chan decodeOutput
464
465 // cancel reading from the input
466 cancel chan struct{}
467}
468
469// errEndOfStream indicates that everything from the stream was read.
470var errEndOfStream = errors.New("end-of-stream")
471
472// Create Decoder:
473// Spawn n block decoders. These accept tasks to decode a block.
474// Create goroutine that handles stream processing, this will send history to decoders as they are available.
475// Decoders update the history as they decode.
476// When a block is returned:
477// a) history is sent to the next decoder,
478// b) content written to CRC.
479// c) return data to WRITER.
480// d) wait for next block to return data.
481// Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
482func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
483 defer d.streamWg.Done()
484 frame := newFrameDec(d.o)
485 for stream := range inStream {
486 if debugDecoder {
487 println("got new stream")
488 }
489 br := readerWrapper{r: stream.r}
490 decodeStream:
491 for {
492 frame.history.reset()
493 err := frame.reset(&br)
494 if debugDecoder && err != nil {
495 println("Frame decoder returned", err)
496 }
497 if err == nil && frame.DictionaryID != nil {
498 dict, ok := d.dicts[*frame.DictionaryID]
499 if !ok {
500 err = ErrUnknownDictionary
501 } else {
502 frame.history.setDict(&dict)
503 }
504 }
505 if err != nil {
506 stream.output <- decodeOutput{
507 err: err,
508 }
509 break
510 }
511 if debugDecoder {
512 println("starting frame decoder")
513 }
514
515 // This goroutine will forward history between frames.
516 frame.frameDone.Add(1)
517 frame.initAsync()
518
519 go frame.startDecoder(stream.output)
520 decodeFrame:
521 // Go through all blocks of the frame.
522 for {
523 dec := <-d.decoders
524 select {
525 case <-stream.cancel:
526 if !frame.sendErr(dec, io.EOF) {
527 // To not let the decoder dangle, send it back.
528 stream.output <- decodeOutput{d: dec}
529 }
530 break decodeStream
531 default:
532 }
533 err := frame.next(dec)
534 switch err {
535 case io.EOF:
536 // End of current frame, no error
537 println("EOF on next block")
538 break decodeFrame
539 case nil:
540 continue
541 default:
542 println("block decoder returned", err)
543 break decodeStream
544 }
545 }
546 // All blocks have started decoding, check if there are more frames.
547 println("waiting for done")
548 frame.frameDone.Wait()
549 println("done waiting...")
550 }
551 frame.frameDone.Wait()
552 println("Sending EOS")
553 stream.output <- decodeOutput{err: errEndOfStream}
554 }
555}