blob: d212f4737f20c80f16225f7f79117ae36fa9a467 [file] [log] [blame]
khenaidoo7d3c5582021-08-11 18:09:44 -04001// Copyright 2019+ Klaus Post. All rights reserved.
2// License information can be found in the LICENSE file.
3// Based on work by Yann Collet, released under BSD License.
4
5package zstd
6
7import (
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +05308 "bytes"
9 "context"
10 "encoding/binary"
khenaidoo7d3c5582021-08-11 18:09:44 -040011 "io"
12 "sync"
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +053013
14 "github.com/klauspost/compress/zstd/internal/xxhash"
khenaidoo7d3c5582021-08-11 18:09:44 -040015)
16
17// Decoder provides decoding of zstandard streams.
18// The decoder has been designed to operate without allocations after a warmup.
19// This means that you should store the decoder for best performance.
20// To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
21// A decoder can safely be re-used even if the previous stream failed.
22// To release the resources, you must call the Close() function on a decoder.
23type Decoder struct {
24 o decoderOptions
25
26 // Unreferenced decoders, ready for use.
27 decoders chan *blockDec
28
khenaidoo7d3c5582021-08-11 18:09:44 -040029 // Current read position used for Reader functionality.
30 current decoderState
31
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +053032 // sync stream decoding
33 syncStream struct {
34 decodedFrame uint64
35 br readerWrapper
36 enabled bool
37 inFrame bool
38 }
39
40 frame *frameDec
41
khenaidoo7d3c5582021-08-11 18:09:44 -040042 // Custom dictionaries.
43 // Always uses copies.
44 dicts map[uint32]dict
45
46 // streamWg is the waitgroup for all streams
47 streamWg sync.WaitGroup
48}
49
50// decoderState is used for maintaining state when the decoder
51// is used for streaming.
52type decoderState struct {
53 // current block being written to stream.
54 decodeOutput
55
56 // output in order to be written to stream.
57 output chan decodeOutput
58
59 // cancel remaining output.
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +053060 cancel context.CancelFunc
61
62 // crc of current frame
63 crc *xxhash.Digest
khenaidoo7d3c5582021-08-11 18:09:44 -040064
65 flushed bool
66}
67
68var (
69 // Check the interfaces we want to support.
70 _ = io.WriterTo(&Decoder{})
71 _ = io.Reader(&Decoder{})
72)
73
74// NewReader creates a new decoder.
75// A nil Reader can be provided in which case Reset can be used to start a decode.
76//
77// A Decoder can be used in two modes:
78//
79// 1) As a stream, or
80// 2) For stateless decoding using DecodeAll.
81//
82// Only a single stream can be decoded concurrently, but the same decoder
83// can run multiple concurrent stateless decodes. It is even possible to
84// use stateless decodes while a stream is being decoded.
85//
86// The Reset function can be used to initiate a new stream, which is will considerably
87// reduce the allocations normally caused by NewReader.
88func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
89 initPredefined()
90 var d Decoder
91 d.o.setDefault()
92 for _, o := range opts {
93 err := o(&d.o)
94 if err != nil {
95 return nil, err
96 }
97 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +053098 d.current.crc = xxhash.New()
khenaidoo7d3c5582021-08-11 18:09:44 -040099 d.current.flushed = true
100
101 if r == nil {
102 d.current.err = ErrDecoderNilInput
103 }
104
105 // Transfer option dicts.
106 d.dicts = make(map[uint32]dict, len(d.o.dicts))
107 for _, dc := range d.o.dicts {
108 d.dicts[dc.id] = dc
109 }
110 d.o.dicts = nil
111
112 // Create decoders
113 d.decoders = make(chan *blockDec, d.o.concurrent)
114 for i := 0; i < d.o.concurrent; i++ {
115 dec := newBlockDec(d.o.lowMem)
116 dec.localFrame = newFrameDec(d.o)
117 d.decoders <- dec
118 }
119
120 if r == nil {
121 return &d, nil
122 }
123 return &d, d.Reset(r)
124}
125
126// Read bytes from the decompressed stream into p.
127// Returns the number of bytes written and any error that occurred.
128// When the stream is done, io.EOF will be returned.
129func (d *Decoder) Read(p []byte) (int, error) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400130 var n int
131 for {
132 if len(d.current.b) > 0 {
133 filled := copy(p, d.current.b)
134 p = p[filled:]
135 d.current.b = d.current.b[filled:]
136 n += filled
137 }
138 if len(p) == 0 {
139 break
140 }
141 if len(d.current.b) == 0 {
142 // We have an error and no more data
143 if d.current.err != nil {
144 break
145 }
146 if !d.nextBlock(n == 0) {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530147 return n, d.current.err
khenaidoo7d3c5582021-08-11 18:09:44 -0400148 }
149 }
150 }
151 if len(d.current.b) > 0 {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530152 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400153 println("returning", n, "still bytes left:", len(d.current.b))
154 }
155 // Only return error at end of block
156 return n, nil
157 }
158 if d.current.err != nil {
159 d.drainOutput()
160 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530161 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400162 println("returning", n, d.current.err, len(d.decoders))
163 }
164 return n, d.current.err
165}
166
167// Reset will reset the decoder the supplied stream after the current has finished processing.
168// Note that this functionality cannot be used after Close has been called.
169// Reset can be called with a nil reader to release references to the previous reader.
170// After being called with a nil reader, no other operations than Reset or DecodeAll or Close
171// should be used.
172func (d *Decoder) Reset(r io.Reader) error {
173 if d.current.err == ErrDecoderClosed {
174 return d.current.err
175 }
176
177 d.drainOutput()
178
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530179 d.syncStream.br.r = nil
khenaidoo7d3c5582021-08-11 18:09:44 -0400180 if r == nil {
181 d.current.err = ErrDecoderNilInput
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530182 if len(d.current.b) > 0 {
183 d.current.b = d.current.b[:0]
184 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400185 d.current.flushed = true
186 return nil
187 }
188
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530189 // If bytes buffer and < 5MB, do sync decoding anyway.
190 if bb, ok := r.(byter); ok && bb.Len() < 5<<20 {
khenaidoo7d3c5582021-08-11 18:09:44 -0400191 bb2 := bb
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530192 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400193 println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
194 }
195 b := bb2.Bytes()
196 var dst []byte
197 if cap(d.current.b) > 0 {
198 dst = d.current.b
199 }
200
201 dst, err := d.DecodeAll(b, dst[:0])
202 if err == nil {
203 err = io.EOF
204 }
205 d.current.b = dst
206 d.current.err = err
207 d.current.flushed = true
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530208 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400209 println("sync decode to", len(dst), "bytes, err:", err)
210 }
211 return nil
212 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400213 // Remove current block.
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530214 d.stashDecoder()
khenaidoo7d3c5582021-08-11 18:09:44 -0400215 d.current.decodeOutput = decodeOutput{}
216 d.current.err = nil
khenaidoo7d3c5582021-08-11 18:09:44 -0400217 d.current.flushed = false
218 d.current.d = nil
219
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530220 // Ensure no-one else is still running...
221 d.streamWg.Wait()
222 if d.frame == nil {
223 d.frame = newFrameDec(d.o)
khenaidoo7d3c5582021-08-11 18:09:44 -0400224 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530225
226 if d.o.concurrent == 1 {
227 return d.startSyncDecoder(r)
228 }
229
230 d.current.output = make(chan decodeOutput, d.o.concurrent)
231 ctx, cancel := context.WithCancel(context.Background())
232 d.current.cancel = cancel
233 d.streamWg.Add(1)
234 go d.startStreamDecoder(ctx, r, d.current.output)
235
khenaidoo7d3c5582021-08-11 18:09:44 -0400236 return nil
237}
238
239// drainOutput will drain the output until errEndOfStream is sent.
240func (d *Decoder) drainOutput() {
241 if d.current.cancel != nil {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530242 if debugDecoder {
243 println("cancelling current")
244 }
245 d.current.cancel()
khenaidoo7d3c5582021-08-11 18:09:44 -0400246 d.current.cancel = nil
247 }
248 if d.current.d != nil {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530249 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400250 printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
251 }
252 d.decoders <- d.current.d
253 d.current.d = nil
254 d.current.b = nil
255 }
256 if d.current.output == nil || d.current.flushed {
257 println("current already flushed")
258 return
259 }
260 for v := range d.current.output {
261 if v.d != nil {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530262 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400263 printf("re-adding decoder %p", v.d)
264 }
265 d.decoders <- v.d
266 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400267 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530268 d.current.output = nil
269 d.current.flushed = true
khenaidoo7d3c5582021-08-11 18:09:44 -0400270}
271
272// WriteTo writes data to w until there's no more data to write or when an error occurs.
273// The return value n is the number of bytes written.
274// Any error encountered during the write is also returned.
275func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400276 var n int64
277 for {
278 if len(d.current.b) > 0 {
279 n2, err2 := w.Write(d.current.b)
280 n += int64(n2)
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530281 if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400282 d.current.err = err2
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530283 } else if n2 != len(d.current.b) {
284 d.current.err = io.ErrShortWrite
khenaidoo7d3c5582021-08-11 18:09:44 -0400285 }
286 }
287 if d.current.err != nil {
288 break
289 }
290 d.nextBlock(true)
291 }
292 err := d.current.err
293 if err != nil {
294 d.drainOutput()
295 }
296 if err == io.EOF {
297 err = nil
298 }
299 return n, err
300}
301
302// DecodeAll allows stateless decoding of a blob of bytes.
303// Output will be appended to dst, so if the destination size is known
304// you can pre-allocate the destination slice to avoid allocations.
305// DecodeAll can be used concurrently.
306// The Decoder concurrency limits will be respected.
307func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530308 if d.decoders == nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400309 return dst, ErrDecoderClosed
310 }
311
312 // Grab a block decoder and frame decoder.
313 block := <-d.decoders
314 frame := block.localFrame
315 defer func() {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530316 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400317 printf("re-adding decoder: %p", block)
318 }
319 frame.rawInput = nil
320 frame.bBuf = nil
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530321 if frame.history.decoders.br != nil {
322 frame.history.decoders.br.in = nil
323 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400324 d.decoders <- block
325 }()
326 frame.bBuf = input
327
328 for {
329 frame.history.reset()
330 err := frame.reset(&frame.bBuf)
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530331 if err != nil {
332 if err == io.EOF {
333 if debugDecoder {
334 println("frame reset return EOF")
335 }
336 return dst, nil
khenaidoo7d3c5582021-08-11 18:09:44 -0400337 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530338 return dst, err
khenaidoo7d3c5582021-08-11 18:09:44 -0400339 }
340 if frame.DictionaryID != nil {
341 dict, ok := d.dicts[*frame.DictionaryID]
342 if !ok {
343 return nil, ErrUnknownDictionary
344 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530345 if debugDecoder {
346 println("setting dict", frame.DictionaryID)
347 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400348 frame.history.setDict(&dict)
349 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530350 if frame.WindowSize > d.o.maxWindowSize {
351 if debugDecoder {
352 println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
353 }
354 return dst, ErrWindowSizeExceeded
khenaidoo7d3c5582021-08-11 18:09:44 -0400355 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530356 if frame.FrameContentSize != fcsUnknown {
357 if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
358 return dst, ErrDecoderSizeExceeded
359 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400360 if cap(dst)-len(dst) < int(frame.FrameContentSize) {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530361 dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
khenaidoo7d3c5582021-08-11 18:09:44 -0400362 copy(dst2, dst)
363 dst = dst2
364 }
365 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530366
khenaidoo7d3c5582021-08-11 18:09:44 -0400367 if cap(dst) == 0 {
368 // Allocate len(input) * 2 by default if nothing is provided
369 // and we didn't get frame content size.
370 size := len(input) * 2
371 // Cap to 1 MB.
372 if size > 1<<20 {
373 size = 1 << 20
374 }
375 if uint64(size) > d.o.maxDecodedSize {
376 size = int(d.o.maxDecodedSize)
377 }
378 dst = make([]byte, 0, size)
379 }
380
381 dst, err = frame.runDecoder(dst, block)
382 if err != nil {
383 return dst, err
384 }
385 if len(frame.bBuf) == 0 {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530386 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400387 println("frame dbuf empty")
388 }
389 break
390 }
391 }
392 return dst, nil
393}
394
395// nextBlock returns the next block.
396// If an error occurs d.err will be set.
397// Optionally the function can block for new output.
398// If non-blocking mode is used the returned boolean will be false
399// if no data was available without blocking.
400func (d *Decoder) nextBlock(blocking bool) (ok bool) {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530401 if d.current.err != nil {
402 // Keep error state.
403 return false
404 }
405 d.current.b = d.current.b[:0]
406
407 // SYNC:
408 if d.syncStream.enabled {
409 if !blocking {
410 return false
411 }
412 ok = d.nextBlockSync()
413 if !ok {
414 d.stashDecoder()
415 }
416 return ok
417 }
418
419 //ASYNC:
420 d.stashDecoder()
421 if blocking {
422 d.current.decodeOutput, ok = <-d.current.output
423 } else {
424 select {
425 case d.current.decodeOutput, ok = <-d.current.output:
426 default:
427 return false
428 }
429 }
430 if !ok {
431 // This should not happen, so signal error state...
432 d.current.err = io.ErrUnexpectedEOF
433 return false
434 }
435 next := d.current.decodeOutput
436 if next.d != nil && next.d.async.newHist != nil {
437 d.current.crc.Reset()
438 }
439 if debugDecoder {
440 var tmp [4]byte
441 binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
442 println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
443 }
444
445 if !d.o.ignoreChecksum && len(next.b) > 0 {
446 n, err := d.current.crc.Write(next.b)
447 if err == nil {
448 if n != len(next.b) {
449 d.current.err = io.ErrShortWrite
450 }
451 }
452 }
453 if next.err == nil && next.d != nil && len(next.d.checkCRC) != 0 {
454 got := d.current.crc.Sum64()
455 var tmp [4]byte
456 binary.LittleEndian.PutUint32(tmp[:], uint32(got))
457 if !d.o.ignoreChecksum && !bytes.Equal(tmp[:], next.d.checkCRC) {
458 if debugDecoder {
459 println("CRC Check Failed:", tmp[:], " (got) !=", next.d.checkCRC, "(on stream)")
460 }
461 d.current.err = ErrCRCMismatch
462 } else {
463 if debugDecoder {
464 println("CRC ok", tmp[:])
465 }
466 }
467 }
468
469 return true
470}
471
472func (d *Decoder) nextBlockSync() (ok bool) {
473 if d.current.d == nil {
474 d.current.d = <-d.decoders
475 }
476 for len(d.current.b) == 0 {
477 if !d.syncStream.inFrame {
478 d.frame.history.reset()
479 d.current.err = d.frame.reset(&d.syncStream.br)
480 if d.current.err != nil {
481 return false
482 }
483 if d.frame.DictionaryID != nil {
484 dict, ok := d.dicts[*d.frame.DictionaryID]
485 if !ok {
486 d.current.err = ErrUnknownDictionary
487 return false
488 } else {
489 d.frame.history.setDict(&dict)
490 }
491 }
492 if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
493 d.current.err = ErrDecoderSizeExceeded
494 return false
495 }
496
497 d.syncStream.decodedFrame = 0
498 d.syncStream.inFrame = true
499 }
500 d.current.err = d.frame.next(d.current.d)
501 if d.current.err != nil {
502 return false
503 }
504 d.frame.history.ensureBlock()
505 if debugDecoder {
506 println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
507 }
508 histBefore := len(d.frame.history.b)
509 d.current.err = d.current.d.decodeBuf(&d.frame.history)
510
511 if d.current.err != nil {
512 println("error after:", d.current.err)
513 return false
514 }
515 d.current.b = d.frame.history.b[histBefore:]
516 if debugDecoder {
517 println("history after:", len(d.frame.history.b))
518 }
519
520 // Check frame size (before CRC)
521 d.syncStream.decodedFrame += uint64(len(d.current.b))
522 if d.syncStream.decodedFrame > d.frame.FrameContentSize {
523 if debugDecoder {
524 printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
525 }
526 d.current.err = ErrFrameSizeExceeded
527 return false
528 }
529
530 // Check FCS
531 if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
532 if debugDecoder {
533 printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
534 }
535 d.current.err = ErrFrameSizeMismatch
536 return false
537 }
538
539 // Update/Check CRC
540 if d.frame.HasCheckSum {
541 if !d.o.ignoreChecksum {
542 d.frame.crc.Write(d.current.b)
543 }
544 if d.current.d.Last {
545 if !d.o.ignoreChecksum {
546 d.current.err = d.frame.checkCRC()
547 } else {
548 d.current.err = d.frame.consumeCRC()
549 }
550 if d.current.err != nil {
551 println("CRC error:", d.current.err)
552 return false
553 }
554 }
555 }
556 d.syncStream.inFrame = !d.current.d.Last
557 }
558 return true
559}
560
561func (d *Decoder) stashDecoder() {
khenaidoo7d3c5582021-08-11 18:09:44 -0400562 if d.current.d != nil {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530563 if debugDecoder {
khenaidoo7d3c5582021-08-11 18:09:44 -0400564 printf("re-adding current decoder %p", d.current.d)
565 }
566 d.decoders <- d.current.d
567 d.current.d = nil
568 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400569}
570
571// Close will release all resources.
572// It is NOT possible to reuse the decoder after this.
573func (d *Decoder) Close() {
574 if d.current.err == ErrDecoderClosed {
575 return
576 }
577 d.drainOutput()
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530578 if d.current.cancel != nil {
579 d.current.cancel()
khenaidoo7d3c5582021-08-11 18:09:44 -0400580 d.streamWg.Wait()
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530581 d.current.cancel = nil
khenaidoo7d3c5582021-08-11 18:09:44 -0400582 }
583 if d.decoders != nil {
584 close(d.decoders)
585 for dec := range d.decoders {
586 dec.Close()
587 }
588 d.decoders = nil
589 }
590 if d.current.d != nil {
591 d.current.d.Close()
592 d.current.d = nil
593 }
594 d.current.err = ErrDecoderClosed
595}
596
597// IOReadCloser returns the decoder as an io.ReadCloser for convenience.
598// Any changes to the decoder will be reflected, so the returned ReadCloser
599// can be reused along with the decoder.
600// io.WriterTo is also supported by the returned ReadCloser.
601func (d *Decoder) IOReadCloser() io.ReadCloser {
602 return closeWrapper{d: d}
603}
604
605// closeWrapper wraps a function call as a closer.
606type closeWrapper struct {
607 d *Decoder
608}
609
610// WriteTo forwards WriteTo calls to the decoder.
611func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
612 return c.d.WriteTo(w)
613}
614
615// Read forwards read calls to the decoder.
616func (c closeWrapper) Read(p []byte) (n int, err error) {
617 return c.d.Read(p)
618}
619
620// Close closes the decoder.
621func (c closeWrapper) Close() error {
622 c.d.Close()
623 return nil
624}
625
626type decodeOutput struct {
627 d *blockDec
628 b []byte
629 err error
630}
631
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530632func (d *Decoder) startSyncDecoder(r io.Reader) error {
633 d.frame.history.reset()
634 d.syncStream.br = readerWrapper{r: r}
635 d.syncStream.inFrame = false
636 d.syncStream.enabled = true
637 d.syncStream.decodedFrame = 0
638 return nil
khenaidoo7d3c5582021-08-11 18:09:44 -0400639}
640
khenaidoo7d3c5582021-08-11 18:09:44 -0400641// Create Decoder:
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530642// ASYNC:
643// Spawn 3 go routines.
644// 0: Read frames and decode block literals.
645// 1: Decode sequences.
646// 2: Execute sequences, send to output.
647func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400648 defer d.streamWg.Done()
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530649 br := readerWrapper{r: r}
650
651 var seqDecode = make(chan *blockDec, d.o.concurrent)
652 var seqExecute = make(chan *blockDec, d.o.concurrent)
653
654 // Async 1: Decode sequences...
655 go func() {
656 var hist history
657 var hasErr bool
658
659 for block := range seqDecode {
660 if hasErr {
661 if block != nil {
662 seqExecute <- block
663 }
664 continue
665 }
666 if block.async.newHist != nil {
667 if debugDecoder {
668 println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
669 }
670 hist.decoders = block.async.newHist.decoders
671 hist.recentOffsets = block.async.newHist.recentOffsets
672 hist.windowSize = block.async.newHist.windowSize
673 if block.async.newHist.dict != nil {
674 hist.setDict(block.async.newHist.dict)
675 }
676 }
677 if block.err != nil || block.Type != blockTypeCompressed {
678 hasErr = block.err != nil
679 seqExecute <- block
680 continue
681 }
682
683 hist.decoders.literals = block.async.literals
684 block.err = block.prepareSequences(block.async.seqData, &hist)
685 if debugDecoder && block.err != nil {
686 println("prepareSequences returned:", block.err)
687 }
688 hasErr = block.err != nil
689 if block.err == nil {
690 block.err = block.decodeSequences(&hist)
691 if debugDecoder && block.err != nil {
692 println("decodeSequences returned:", block.err)
693 }
694 hasErr = block.err != nil
695 // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
696 block.async.seqSize = hist.decoders.seqSize
697 }
698 seqExecute <- block
khenaidoo7d3c5582021-08-11 18:09:44 -0400699 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530700 close(seqExecute)
701 }()
702
703 var wg sync.WaitGroup
704 wg.Add(1)
705
706 // Async 3: Execute sequences...
707 frameHistCache := d.frame.history.b
708 go func() {
709 var hist history
710 var decodedFrame uint64
711 var fcs uint64
712 var hasErr bool
713 for block := range seqExecute {
714 out := decodeOutput{err: block.err, d: block}
715 if block.err != nil || hasErr {
716 hasErr = true
717 output <- out
718 continue
khenaidoo7d3c5582021-08-11 18:09:44 -0400719 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530720 if block.async.newHist != nil {
721 if debugDecoder {
722 println("Async 2: new history")
723 }
724 hist.windowSize = block.async.newHist.windowSize
725 hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
726 if block.async.newHist.dict != nil {
727 hist.setDict(block.async.newHist.dict)
728 }
729
730 if cap(hist.b) < hist.allocFrameBuffer {
731 if cap(frameHistCache) >= hist.allocFrameBuffer {
732 hist.b = frameHistCache
733 } else {
734 hist.b = make([]byte, 0, hist.allocFrameBuffer)
735 println("Alloc history sized", hist.allocFrameBuffer)
736 }
737 }
738 hist.b = hist.b[:0]
739 fcs = block.async.fcs
740 decodedFrame = 0
741 }
742 do := decodeOutput{err: block.err, d: block}
743 switch block.Type {
744 case blockTypeRLE:
745 if debugDecoder {
746 println("add rle block length:", block.RLESize)
747 }
748
749 if cap(block.dst) < int(block.RLESize) {
750 if block.lowMem {
751 block.dst = make([]byte, block.RLESize)
752 } else {
753 block.dst = make([]byte, maxBlockSize)
754 }
755 }
756 block.dst = block.dst[:block.RLESize]
757 v := block.data[0]
758 for i := range block.dst {
759 block.dst[i] = v
760 }
761 hist.append(block.dst)
762 do.b = block.dst
763 case blockTypeRaw:
764 if debugDecoder {
765 println("add raw block length:", len(block.data))
766 }
767 hist.append(block.data)
768 do.b = block.data
769 case blockTypeCompressed:
770 if debugDecoder {
771 println("execute with history length:", len(hist.b), "window:", hist.windowSize)
772 }
773 hist.decoders.seqSize = block.async.seqSize
774 hist.decoders.literals = block.async.literals
775 do.err = block.executeSequences(&hist)
776 hasErr = do.err != nil
777 if debugDecoder && hasErr {
778 println("executeSequences returned:", do.err)
779 }
780 do.b = block.dst
781 }
782 if !hasErr {
783 decodedFrame += uint64(len(do.b))
784 if decodedFrame > fcs {
785 println("fcs exceeded", block.Last, fcs, decodedFrame)
786 do.err = ErrFrameSizeExceeded
787 hasErr = true
788 } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
789 do.err = ErrFrameSizeMismatch
790 hasErr = true
khenaidoo7d3c5582021-08-11 18:09:44 -0400791 } else {
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530792 if debugDecoder {
793 println("fcs ok", block.Last, fcs, decodedFrame)
794 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400795 }
796 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530797 output <- do
798 }
799 close(output)
800 frameHistCache = hist.b
801 wg.Done()
802 if debugDecoder {
803 println("decoder goroutines finished")
804 }
805 }()
806
807decodeStream:
808 for {
809 var hist history
810 var hasErr bool
811
812 decodeBlock := func(block *blockDec) {
813 if hasErr {
814 if block != nil {
815 seqDecode <- block
khenaidoo7d3c5582021-08-11 18:09:44 -0400816 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530817 return
818 }
819 if block.err != nil || block.Type != blockTypeCompressed {
820 hasErr = block.err != nil
821 seqDecode <- block
822 return
823 }
824
825 remain, err := block.decodeLiterals(block.data, &hist)
826 block.err = err
827 hasErr = block.err != nil
828 if err == nil {
829 block.async.literals = hist.decoders.literals
830 block.async.seqData = remain
831 } else if debugDecoder {
832 println("decodeLiterals error:", err)
833 }
834 seqDecode <- block
835 }
836 frame := d.frame
837 if debugDecoder {
838 println("New frame...")
839 }
840 var historySent bool
841 frame.history.reset()
842 err := frame.reset(&br)
843 if debugDecoder && err != nil {
844 println("Frame decoder returned", err)
845 }
846 if err == nil && frame.DictionaryID != nil {
847 dict, ok := d.dicts[*frame.DictionaryID]
848 if !ok {
849 err = ErrUnknownDictionary
850 } else {
851 frame.history.setDict(&dict)
852 }
853 }
854 if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
855 err = ErrDecoderSizeExceeded
856 }
857 if err != nil {
858 select {
859 case <-ctx.Done():
860 case dec := <-d.decoders:
861 dec.sendErr(err)
862 decodeBlock(dec)
863 }
864 break decodeStream
865 }
866
867 // Go through all blocks of the frame.
868 for {
869 var dec *blockDec
870 select {
871 case <-ctx.Done():
872 break decodeStream
873 case dec = <-d.decoders:
874 // Once we have a decoder, we MUST return it.
875 }
876 err := frame.next(dec)
877 if !historySent {
878 h := frame.history
879 if debugDecoder {
880 println("Alloc History:", h.allocFrameBuffer)
881 }
882 hist.reset()
883 if h.dict != nil {
884 hist.setDict(h.dict)
885 }
886 dec.async.newHist = &h
887 dec.async.fcs = frame.FrameContentSize
888 historySent = true
889 } else {
890 dec.async.newHist = nil
891 }
892 if debugDecoder && err != nil {
893 println("next block returned error:", err)
894 }
895 dec.err = err
896 dec.checkCRC = nil
897 if dec.Last && frame.HasCheckSum && err == nil {
898 crc, err := frame.rawInput.readSmall(4)
899 if err != nil {
900 println("CRC missing?", err)
901 dec.err = err
902 }
903 var tmp [4]byte
904 copy(tmp[:], crc)
905 dec.checkCRC = tmp[:]
906 if debugDecoder {
907 println("found crc to check:", dec.checkCRC)
908 }
909 }
910 err = dec.err
911 last := dec.Last
912 decodeBlock(dec)
913 if err != nil {
914 break decodeStream
915 }
916 if last {
khenaidoo7d3c5582021-08-11 18:09:44 -0400917 break
918 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400919 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400920 }
Akash Reddy Kankanalac28f0e22025-06-16 11:00:55 +0530921 close(seqDecode)
922 wg.Wait()
923 d.frame.history.b = frameHistCache
khenaidoo7d3c5582021-08-11 18:09:44 -0400924}