blob: 989c79f8c3150e9afb63322fb481a7dfa05faf10 [file] [log] [blame]
kesavandc71914f2022-03-25 11:19:03 +05301// Copyright 2019+ Klaus Post. All rights reserved.
2// License information can be found in the LICENSE file.
3// Based on work by Yann Collet, released under BSD License.
4
5package zstd
6
7import (
8 "bytes"
9 "encoding/hex"
10 "errors"
11 "hash"
12 "io"
13 "sync"
14
15 "github.com/klauspost/compress/zstd/internal/xxhash"
16)
17
18type frameDec struct {
19 o decoderOptions
20 crc hash.Hash64
21 offset int64
22
23 WindowSize uint64
24
25 // In order queue of blocks being decoded.
26 decoding chan *blockDec
27
28 // Frame history passed between blocks
29 history history
30
31 rawInput byteBuffer
32
33 // Byte buffer that can be reused for small input blocks.
34 bBuf byteBuf
35
36 FrameContentSize uint64
37 frameDone sync.WaitGroup
38
39 DictionaryID *uint32
40 HasCheckSum bool
41 SingleSegment bool
42
43 // asyncRunning indicates whether the async routine processes input on 'decoding'.
44 asyncRunningMu sync.Mutex
45 asyncRunning bool
46}
47
48const (
49 // MinWindowSize is the minimum Window Size, which is 1 KB.
50 MinWindowSize = 1 << 10
51
52 // MaxWindowSize is the maximum encoder window size
53 // and the default decoder maximum window size.
54 MaxWindowSize = 1 << 29
55)
56
57var (
58 frameMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
59 skippableFrameMagic = []byte{0x2a, 0x4d, 0x18}
60)
61
62func newFrameDec(o decoderOptions) *frameDec {
63 if o.maxWindowSize > o.maxDecodedSize {
64 o.maxWindowSize = o.maxDecodedSize
65 }
66 d := frameDec{
67 o: o,
68 }
69 return &d
70}
71
72// reset will read the frame header and prepare for block decoding.
73// If nothing can be read from the input, io.EOF will be returned.
74// Any other error indicated that the stream contained data, but
75// there was a problem.
76func (d *frameDec) reset(br byteBuffer) error {
77 d.HasCheckSum = false
78 d.WindowSize = 0
79 var signature [4]byte
80 for {
81 var err error
82 // Check if we can read more...
83 b, err := br.readSmall(1)
84 switch err {
85 case io.EOF, io.ErrUnexpectedEOF:
86 return io.EOF
87 default:
88 return err
89 case nil:
90 signature[0] = b[0]
91 }
92 // Read the rest, don't allow io.ErrUnexpectedEOF
93 b, err = br.readSmall(3)
94 switch err {
95 case io.EOF:
96 return io.EOF
97 default:
98 return err
99 case nil:
100 copy(signature[1:], b)
101 }
102
103 if !bytes.Equal(signature[1:4], skippableFrameMagic) || signature[0]&0xf0 != 0x50 {
104 if debugDecoder {
105 println("Not skippable", hex.EncodeToString(signature[:]), hex.EncodeToString(skippableFrameMagic))
106 }
107 // Break if not skippable frame.
108 break
109 }
110 // Read size to skip
111 b, err = br.readSmall(4)
112 if err != nil {
113 if debugDecoder {
114 println("Reading Frame Size", err)
115 }
116 return err
117 }
118 n := uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
119 println("Skipping frame with", n, "bytes.")
120 err = br.skipN(int(n))
121 if err != nil {
122 if debugDecoder {
123 println("Reading discarded frame", err)
124 }
125 return err
126 }
127 }
128 if !bytes.Equal(signature[:], frameMagic) {
129 if debugDecoder {
130 println("Got magic numbers: ", signature, "want:", frameMagic)
131 }
132 return ErrMagicMismatch
133 }
134
135 // Read Frame_Header_Descriptor
136 fhd, err := br.readByte()
137 if err != nil {
138 if debugDecoder {
139 println("Reading Frame_Header_Descriptor", err)
140 }
141 return err
142 }
143 d.SingleSegment = fhd&(1<<5) != 0
144
145 if fhd&(1<<3) != 0 {
146 return errors.New("reserved bit set on frame header")
147 }
148
149 // Read Window_Descriptor
150 // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#window_descriptor
151 d.WindowSize = 0
152 if !d.SingleSegment {
153 wd, err := br.readByte()
154 if err != nil {
155 if debugDecoder {
156 println("Reading Window_Descriptor", err)
157 }
158 return err
159 }
160 printf("raw: %x, mantissa: %d, exponent: %d\n", wd, wd&7, wd>>3)
161 windowLog := 10 + (wd >> 3)
162 windowBase := uint64(1) << windowLog
163 windowAdd := (windowBase / 8) * uint64(wd&0x7)
164 d.WindowSize = windowBase + windowAdd
165 }
166
167 // Read Dictionary_ID
168 // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#dictionary_id
169 d.DictionaryID = nil
170 if size := fhd & 3; size != 0 {
171 if size == 3 {
172 size = 4
173 }
174
175 b, err := br.readSmall(int(size))
176 if err != nil {
177 println("Reading Dictionary_ID", err)
178 return err
179 }
180 var id uint32
181 switch size {
182 case 1:
183 id = uint32(b[0])
184 case 2:
185 id = uint32(b[0]) | (uint32(b[1]) << 8)
186 case 4:
187 id = uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
188 }
189 if debugDecoder {
190 println("Dict size", size, "ID:", id)
191 }
192 if id > 0 {
193 // ID 0 means "sorry, no dictionary anyway".
194 // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#dictionary-format
195 d.DictionaryID = &id
196 }
197 }
198
199 // Read Frame_Content_Size
200 // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#frame_content_size
201 var fcsSize int
202 v := fhd >> 6
203 switch v {
204 case 0:
205 if d.SingleSegment {
206 fcsSize = 1
207 }
208 default:
209 fcsSize = 1 << v
210 }
211 d.FrameContentSize = 0
212 if fcsSize > 0 {
213 b, err := br.readSmall(fcsSize)
214 if err != nil {
215 println("Reading Frame content", err)
216 return err
217 }
218 switch fcsSize {
219 case 1:
220 d.FrameContentSize = uint64(b[0])
221 case 2:
222 // When FCS_Field_Size is 2, the offset of 256 is added.
223 d.FrameContentSize = uint64(b[0]) | (uint64(b[1]) << 8) + 256
224 case 4:
225 d.FrameContentSize = uint64(b[0]) | (uint64(b[1]) << 8) | (uint64(b[2]) << 16) | (uint64(b[3]) << 24)
226 case 8:
227 d1 := uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
228 d2 := uint32(b[4]) | (uint32(b[5]) << 8) | (uint32(b[6]) << 16) | (uint32(b[7]) << 24)
229 d.FrameContentSize = uint64(d1) | (uint64(d2) << 32)
230 }
231 if debugDecoder {
232 println("field size bits:", v, "fcsSize:", fcsSize, "FrameContentSize:", d.FrameContentSize, hex.EncodeToString(b[:fcsSize]), "singleseg:", d.SingleSegment, "window:", d.WindowSize)
233 }
234 }
235 // Move this to shared.
236 d.HasCheckSum = fhd&(1<<2) != 0
237 if d.HasCheckSum {
238 if d.crc == nil {
239 d.crc = xxhash.New()
240 }
241 d.crc.Reset()
242 }
243
244 if d.WindowSize == 0 && d.SingleSegment {
245 // We may not need window in this case.
246 d.WindowSize = d.FrameContentSize
247 if d.WindowSize < MinWindowSize {
248 d.WindowSize = MinWindowSize
249 }
250 }
251
252 if d.WindowSize > uint64(d.o.maxWindowSize) {
253 if debugDecoder {
254 printf("window size %d > max %d\n", d.WindowSize, d.o.maxWindowSize)
255 }
256 return ErrWindowSizeExceeded
257 }
258 // The minimum Window_Size is 1 KB.
259 if d.WindowSize < MinWindowSize {
260 if debugDecoder {
261 println("got window size: ", d.WindowSize)
262 }
263 return ErrWindowSizeTooSmall
264 }
265 d.history.windowSize = int(d.WindowSize)
266 if d.o.lowMem && d.history.windowSize < maxBlockSize {
267 d.history.maxSize = d.history.windowSize * 2
268 } else {
269 d.history.maxSize = d.history.windowSize + maxBlockSize
270 }
271 // history contains input - maybe we do something
272 d.rawInput = br
273 return nil
274}
275
276// next will start decoding the next block from stream.
277func (d *frameDec) next(block *blockDec) error {
278 if debugDecoder {
279 printf("decoding new block %p:%p", block, block.data)
280 }
281 err := block.reset(d.rawInput, d.WindowSize)
282 if err != nil {
283 println("block error:", err)
284 // Signal the frame decoder we have a problem.
285 d.sendErr(block, err)
286 return err
287 }
288 block.input <- struct{}{}
289 if debugDecoder {
290 println("next block:", block)
291 }
292 d.asyncRunningMu.Lock()
293 defer d.asyncRunningMu.Unlock()
294 if !d.asyncRunning {
295 return nil
296 }
297 if block.Last {
298 // We indicate the frame is done by sending io.EOF
299 d.decoding <- block
300 return io.EOF
301 }
302 d.decoding <- block
303 return nil
304}
305
306// sendEOF will queue an error block on the frame.
307// This will cause the frame decoder to return when it encounters the block.
308// Returns true if the decoder was added.
309func (d *frameDec) sendErr(block *blockDec, err error) bool {
310 d.asyncRunningMu.Lock()
311 defer d.asyncRunningMu.Unlock()
312 if !d.asyncRunning {
313 return false
314 }
315
316 println("sending error", err.Error())
317 block.sendErr(err)
318 d.decoding <- block
319 return true
320}
321
322// checkCRC will check the checksum if the frame has one.
323// Will return ErrCRCMismatch if crc check failed, otherwise nil.
324func (d *frameDec) checkCRC() error {
325 if !d.HasCheckSum {
326 return nil
327 }
328 var tmp [4]byte
329 got := d.crc.Sum64()
330 // Flip to match file order.
331 tmp[0] = byte(got >> 0)
332 tmp[1] = byte(got >> 8)
333 tmp[2] = byte(got >> 16)
334 tmp[3] = byte(got >> 24)
335
336 // We can overwrite upper tmp now
337 want, err := d.rawInput.readSmall(4)
338 if err != nil {
339 println("CRC missing?", err)
340 return err
341 }
342
343 if !bytes.Equal(tmp[:], want) {
344 if debugDecoder {
345 println("CRC Check Failed:", tmp[:], "!=", want)
346 }
347 return ErrCRCMismatch
348 }
349 if debugDecoder {
350 println("CRC ok", tmp[:])
351 }
352 return nil
353}
354
355func (d *frameDec) initAsync() {
356 if !d.o.lowMem && !d.SingleSegment {
357 // set max extra size history to 2MB.
358 d.history.maxSize = d.history.windowSize + maxBlockSize
359 }
360 // re-alloc if more than one extra block size.
361 if d.o.lowMem && cap(d.history.b) > d.history.maxSize+maxBlockSize {
362 d.history.b = make([]byte, 0, d.history.maxSize)
363 }
364 if cap(d.history.b) < d.history.maxSize {
365 d.history.b = make([]byte, 0, d.history.maxSize)
366 }
367 if cap(d.decoding) < d.o.concurrent {
368 d.decoding = make(chan *blockDec, d.o.concurrent)
369 }
370 if debugDecoder {
371 h := d.history
372 printf("history init. len: %d, cap: %d", len(h.b), cap(h.b))
373 }
374 d.asyncRunningMu.Lock()
375 d.asyncRunning = true
376 d.asyncRunningMu.Unlock()
377}
378
379// startDecoder will start decoding blocks and write them to the writer.
380// The decoder will stop as soon as an error occurs or at end of frame.
381// When the frame has finished decoding the *bufio.Reader
382// containing the remaining input will be sent on frameDec.frameDone.
383func (d *frameDec) startDecoder(output chan decodeOutput) {
384 written := int64(0)
385
386 defer func() {
387 d.asyncRunningMu.Lock()
388 d.asyncRunning = false
389 d.asyncRunningMu.Unlock()
390
391 // Drain the currently decoding.
392 d.history.error = true
393 flushdone:
394 for {
395 select {
396 case b := <-d.decoding:
397 b.history <- &d.history
398 output <- <-b.result
399 default:
400 break flushdone
401 }
402 }
403 println("frame decoder done, signalling done")
404 d.frameDone.Done()
405 }()
406 // Get decoder for first block.
407 block := <-d.decoding
408 block.history <- &d.history
409 for {
410 var next *blockDec
411 // Get result
412 r := <-block.result
413 if r.err != nil {
414 println("Result contained error", r.err)
415 output <- r
416 return
417 }
418 if debugDecoder {
419 println("got result, from ", d.offset, "to", d.offset+int64(len(r.b)))
420 d.offset += int64(len(r.b))
421 }
422 if !block.Last {
423 // Send history to next block
424 select {
425 case next = <-d.decoding:
426 if debugDecoder {
427 println("Sending ", len(d.history.b), "bytes as history")
428 }
429 next.history <- &d.history
430 default:
431 // Wait until we have sent the block, so
432 // other decoders can potentially get the decoder.
433 next = nil
434 }
435 }
436
437 // Add checksum, async to decoding.
438 if d.HasCheckSum {
439 n, err := d.crc.Write(r.b)
440 if err != nil {
441 r.err = err
442 if n != len(r.b) {
443 r.err = io.ErrShortWrite
444 }
445 output <- r
446 return
447 }
448 }
449 written += int64(len(r.b))
450 if d.SingleSegment && uint64(written) > d.FrameContentSize {
451 println("runDecoder: single segment and", uint64(written), ">", d.FrameContentSize)
452 r.err = ErrFrameSizeExceeded
453 output <- r
454 return
455 }
456 if block.Last {
457 r.err = d.checkCRC()
458 output <- r
459 return
460 }
461 output <- r
462 if next == nil {
463 // There was no decoder available, we wait for one now that we have sent to the writer.
464 if debugDecoder {
465 println("Sending ", len(d.history.b), " bytes as history")
466 }
467 next = <-d.decoding
468 next.history <- &d.history
469 }
470 block = next
471 }
472}
473
474// runDecoder will create a sync decoder that will decode a block of data.
475func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) {
476 saved := d.history.b
477
478 // We use the history for output to avoid copying it.
479 d.history.b = dst
480 // Store input length, so we only check new data.
481 crcStart := len(dst)
482 var err error
483 for {
484 err = dec.reset(d.rawInput, d.WindowSize)
485 if err != nil {
486 break
487 }
488 if debugDecoder {
489 println("next block:", dec)
490 }
491 err = dec.decodeBuf(&d.history)
492 if err != nil || dec.Last {
493 break
494 }
495 if uint64(len(d.history.b)) > d.o.maxDecodedSize {
496 err = ErrDecoderSizeExceeded
497 break
498 }
499 if d.SingleSegment && uint64(len(d.history.b)) > d.o.maxDecodedSize {
500 println("runDecoder: single segment and", uint64(len(d.history.b)), ">", d.o.maxDecodedSize)
501 err = ErrFrameSizeExceeded
502 break
503 }
504 }
505 dst = d.history.b
506 if err == nil {
507 if d.HasCheckSum {
508 var n int
509 n, err = d.crc.Write(dst[crcStart:])
510 if err == nil {
511 if n != len(dst)-crcStart {
512 err = io.ErrShortWrite
513 } else {
514 err = d.checkCRC()
515 }
516 }
517 }
518 }
519 d.history.b = saved
520 return dst, err
521}