khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 1 | // Copyright 2019+ Klaus Post. All rights reserved. |
| 2 | // License information can be found in the LICENSE file. |
| 3 | // Based on work by Yann Collet, released under BSD License. |
| 4 | |
| 5 | package zstd |
| 6 | |
| 7 | import ( |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 8 | "bytes" |
| 9 | "context" |
| 10 | "encoding/binary" |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 11 | "io" |
| 12 | "sync" |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 13 | |
| 14 | "github.com/klauspost/compress/zstd/internal/xxhash" |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 15 | ) |
| 16 | |
| 17 | // Decoder provides decoding of zstandard streams. |
| 18 | // The decoder has been designed to operate without allocations after a warmup. |
| 19 | // This means that you should store the decoder for best performance. |
| 20 | // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream. |
| 21 | // A decoder can safely be re-used even if the previous stream failed. |
| 22 | // To release the resources, you must call the Close() function on a decoder. |
| 23 | type Decoder struct { |
| 24 | o decoderOptions |
| 25 | |
| 26 | // Unreferenced decoders, ready for use. |
| 27 | decoders chan *blockDec |
| 28 | |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 29 | // Current read position used for Reader functionality. |
| 30 | current decoderState |
| 31 | |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 32 | // sync stream decoding |
| 33 | syncStream struct { |
| 34 | decodedFrame uint64 |
| 35 | br readerWrapper |
| 36 | enabled bool |
| 37 | inFrame bool |
| 38 | } |
| 39 | |
| 40 | frame *frameDec |
| 41 | |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 42 | // Custom dictionaries. |
| 43 | // Always uses copies. |
| 44 | dicts map[uint32]dict |
| 45 | |
| 46 | // streamWg is the waitgroup for all streams |
| 47 | streamWg sync.WaitGroup |
| 48 | } |
| 49 | |
| 50 | // decoderState is used for maintaining state when the decoder |
| 51 | // is used for streaming. |
| 52 | type decoderState struct { |
| 53 | // current block being written to stream. |
| 54 | decodeOutput |
| 55 | |
| 56 | // output in order to be written to stream. |
| 57 | output chan decodeOutput |
| 58 | |
| 59 | // cancel remaining output. |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 60 | cancel context.CancelFunc |
| 61 | |
| 62 | // crc of current frame |
| 63 | crc *xxhash.Digest |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 64 | |
| 65 | flushed bool |
| 66 | } |
| 67 | |
| 68 | var ( |
| 69 | // Check the interfaces we want to support. |
| 70 | _ = io.WriterTo(&Decoder{}) |
| 71 | _ = io.Reader(&Decoder{}) |
| 72 | ) |
| 73 | |
| 74 | // NewReader creates a new decoder. |
| 75 | // A nil Reader can be provided in which case Reset can be used to start a decode. |
| 76 | // |
| 77 | // A Decoder can be used in two modes: |
| 78 | // |
| 79 | // 1) As a stream, or |
| 80 | // 2) For stateless decoding using DecodeAll. |
| 81 | // |
| 82 | // Only a single stream can be decoded concurrently, but the same decoder |
| 83 | // can run multiple concurrent stateless decodes. It is even possible to |
| 84 | // use stateless decodes while a stream is being decoded. |
| 85 | // |
| 86 | // The Reset function can be used to initiate a new stream, which is will considerably |
| 87 | // reduce the allocations normally caused by NewReader. |
| 88 | func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) { |
| 89 | initPredefined() |
| 90 | var d Decoder |
| 91 | d.o.setDefault() |
| 92 | for _, o := range opts { |
| 93 | err := o(&d.o) |
| 94 | if err != nil { |
| 95 | return nil, err |
| 96 | } |
| 97 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 98 | d.current.crc = xxhash.New() |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 99 | d.current.flushed = true |
| 100 | |
| 101 | if r == nil { |
| 102 | d.current.err = ErrDecoderNilInput |
| 103 | } |
| 104 | |
| 105 | // Transfer option dicts. |
| 106 | d.dicts = make(map[uint32]dict, len(d.o.dicts)) |
| 107 | for _, dc := range d.o.dicts { |
| 108 | d.dicts[dc.id] = dc |
| 109 | } |
| 110 | d.o.dicts = nil |
| 111 | |
| 112 | // Create decoders |
| 113 | d.decoders = make(chan *blockDec, d.o.concurrent) |
| 114 | for i := 0; i < d.o.concurrent; i++ { |
| 115 | dec := newBlockDec(d.o.lowMem) |
| 116 | dec.localFrame = newFrameDec(d.o) |
| 117 | d.decoders <- dec |
| 118 | } |
| 119 | |
| 120 | if r == nil { |
| 121 | return &d, nil |
| 122 | } |
| 123 | return &d, d.Reset(r) |
| 124 | } |
| 125 | |
| 126 | // Read bytes from the decompressed stream into p. |
| 127 | // Returns the number of bytes written and any error that occurred. |
| 128 | // When the stream is done, io.EOF will be returned. |
| 129 | func (d *Decoder) Read(p []byte) (int, error) { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 130 | var n int |
| 131 | for { |
| 132 | if len(d.current.b) > 0 { |
| 133 | filled := copy(p, d.current.b) |
| 134 | p = p[filled:] |
| 135 | d.current.b = d.current.b[filled:] |
| 136 | n += filled |
| 137 | } |
| 138 | if len(p) == 0 { |
| 139 | break |
| 140 | } |
| 141 | if len(d.current.b) == 0 { |
| 142 | // We have an error and no more data |
| 143 | if d.current.err != nil { |
| 144 | break |
| 145 | } |
| 146 | if !d.nextBlock(n == 0) { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 147 | return n, d.current.err |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 148 | } |
| 149 | } |
| 150 | } |
| 151 | if len(d.current.b) > 0 { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 152 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 153 | println("returning", n, "still bytes left:", len(d.current.b)) |
| 154 | } |
| 155 | // Only return error at end of block |
| 156 | return n, nil |
| 157 | } |
| 158 | if d.current.err != nil { |
| 159 | d.drainOutput() |
| 160 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 161 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 162 | println("returning", n, d.current.err, len(d.decoders)) |
| 163 | } |
| 164 | return n, d.current.err |
| 165 | } |
| 166 | |
| 167 | // Reset will reset the decoder the supplied stream after the current has finished processing. |
| 168 | // Note that this functionality cannot be used after Close has been called. |
| 169 | // Reset can be called with a nil reader to release references to the previous reader. |
| 170 | // After being called with a nil reader, no other operations than Reset or DecodeAll or Close |
| 171 | // should be used. |
| 172 | func (d *Decoder) Reset(r io.Reader) error { |
| 173 | if d.current.err == ErrDecoderClosed { |
| 174 | return d.current.err |
| 175 | } |
| 176 | |
| 177 | d.drainOutput() |
| 178 | |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 179 | d.syncStream.br.r = nil |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 180 | if r == nil { |
| 181 | d.current.err = ErrDecoderNilInput |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 182 | if len(d.current.b) > 0 { |
| 183 | d.current.b = d.current.b[:0] |
| 184 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 185 | d.current.flushed = true |
| 186 | return nil |
| 187 | } |
| 188 | |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 189 | // If bytes buffer and < 5MB, do sync decoding anyway. |
| 190 | if bb, ok := r.(byter); ok && bb.Len() < 5<<20 { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 191 | bb2 := bb |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 192 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 193 | println("*bytes.Buffer detected, doing sync decode, len:", bb.Len()) |
| 194 | } |
| 195 | b := bb2.Bytes() |
| 196 | var dst []byte |
| 197 | if cap(d.current.b) > 0 { |
| 198 | dst = d.current.b |
| 199 | } |
| 200 | |
| 201 | dst, err := d.DecodeAll(b, dst[:0]) |
| 202 | if err == nil { |
| 203 | err = io.EOF |
| 204 | } |
| 205 | d.current.b = dst |
| 206 | d.current.err = err |
| 207 | d.current.flushed = true |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 208 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 209 | println("sync decode to", len(dst), "bytes, err:", err) |
| 210 | } |
| 211 | return nil |
| 212 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 213 | // Remove current block. |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 214 | d.stashDecoder() |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 215 | d.current.decodeOutput = decodeOutput{} |
| 216 | d.current.err = nil |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 217 | d.current.flushed = false |
| 218 | d.current.d = nil |
| 219 | |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 220 | // Ensure no-one else is still running... |
| 221 | d.streamWg.Wait() |
| 222 | if d.frame == nil { |
| 223 | d.frame = newFrameDec(d.o) |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 224 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 225 | |
| 226 | if d.o.concurrent == 1 { |
| 227 | return d.startSyncDecoder(r) |
| 228 | } |
| 229 | |
| 230 | d.current.output = make(chan decodeOutput, d.o.concurrent) |
| 231 | ctx, cancel := context.WithCancel(context.Background()) |
| 232 | d.current.cancel = cancel |
| 233 | d.streamWg.Add(1) |
| 234 | go d.startStreamDecoder(ctx, r, d.current.output) |
| 235 | |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 236 | return nil |
| 237 | } |
| 238 | |
| 239 | // drainOutput will drain the output until errEndOfStream is sent. |
| 240 | func (d *Decoder) drainOutput() { |
| 241 | if d.current.cancel != nil { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 242 | if debugDecoder { |
| 243 | println("cancelling current") |
| 244 | } |
| 245 | d.current.cancel() |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 246 | d.current.cancel = nil |
| 247 | } |
| 248 | if d.current.d != nil { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 249 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 250 | printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders)) |
| 251 | } |
| 252 | d.decoders <- d.current.d |
| 253 | d.current.d = nil |
| 254 | d.current.b = nil |
| 255 | } |
| 256 | if d.current.output == nil || d.current.flushed { |
| 257 | println("current already flushed") |
| 258 | return |
| 259 | } |
| 260 | for v := range d.current.output { |
| 261 | if v.d != nil { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 262 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 263 | printf("re-adding decoder %p", v.d) |
| 264 | } |
| 265 | d.decoders <- v.d |
| 266 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 267 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 268 | d.current.output = nil |
| 269 | d.current.flushed = true |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 270 | } |
| 271 | |
| 272 | // WriteTo writes data to w until there's no more data to write or when an error occurs. |
| 273 | // The return value n is the number of bytes written. |
| 274 | // Any error encountered during the write is also returned. |
| 275 | func (d *Decoder) WriteTo(w io.Writer) (int64, error) { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 276 | var n int64 |
| 277 | for { |
| 278 | if len(d.current.b) > 0 { |
| 279 | n2, err2 := w.Write(d.current.b) |
| 280 | n += int64(n2) |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 281 | if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 282 | d.current.err = err2 |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 283 | } else if n2 != len(d.current.b) { |
| 284 | d.current.err = io.ErrShortWrite |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 285 | } |
| 286 | } |
| 287 | if d.current.err != nil { |
| 288 | break |
| 289 | } |
| 290 | d.nextBlock(true) |
| 291 | } |
| 292 | err := d.current.err |
| 293 | if err != nil { |
| 294 | d.drainOutput() |
| 295 | } |
| 296 | if err == io.EOF { |
| 297 | err = nil |
| 298 | } |
| 299 | return n, err |
| 300 | } |
| 301 | |
| 302 | // DecodeAll allows stateless decoding of a blob of bytes. |
| 303 | // Output will be appended to dst, so if the destination size is known |
| 304 | // you can pre-allocate the destination slice to avoid allocations. |
| 305 | // DecodeAll can be used concurrently. |
| 306 | // The Decoder concurrency limits will be respected. |
| 307 | func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 308 | if d.decoders == nil { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 309 | return dst, ErrDecoderClosed |
| 310 | } |
| 311 | |
| 312 | // Grab a block decoder and frame decoder. |
| 313 | block := <-d.decoders |
| 314 | frame := block.localFrame |
| 315 | defer func() { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 316 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 317 | printf("re-adding decoder: %p", block) |
| 318 | } |
| 319 | frame.rawInput = nil |
| 320 | frame.bBuf = nil |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 321 | if frame.history.decoders.br != nil { |
| 322 | frame.history.decoders.br.in = nil |
| 323 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 324 | d.decoders <- block |
| 325 | }() |
| 326 | frame.bBuf = input |
| 327 | |
| 328 | for { |
| 329 | frame.history.reset() |
| 330 | err := frame.reset(&frame.bBuf) |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 331 | if err != nil { |
| 332 | if err == io.EOF { |
| 333 | if debugDecoder { |
| 334 | println("frame reset return EOF") |
| 335 | } |
| 336 | return dst, nil |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 337 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 338 | return dst, err |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 339 | } |
| 340 | if frame.DictionaryID != nil { |
| 341 | dict, ok := d.dicts[*frame.DictionaryID] |
| 342 | if !ok { |
| 343 | return nil, ErrUnknownDictionary |
| 344 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 345 | if debugDecoder { |
| 346 | println("setting dict", frame.DictionaryID) |
| 347 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 348 | frame.history.setDict(&dict) |
| 349 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 350 | if frame.WindowSize > d.o.maxWindowSize { |
| 351 | if debugDecoder { |
| 352 | println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize) |
| 353 | } |
| 354 | return dst, ErrWindowSizeExceeded |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 355 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 356 | if frame.FrameContentSize != fcsUnknown { |
| 357 | if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) { |
| 358 | return dst, ErrDecoderSizeExceeded |
| 359 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 360 | if cap(dst)-len(dst) < int(frame.FrameContentSize) { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 361 | dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc) |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 362 | copy(dst2, dst) |
| 363 | dst = dst2 |
| 364 | } |
| 365 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 366 | |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 367 | if cap(dst) == 0 { |
| 368 | // Allocate len(input) * 2 by default if nothing is provided |
| 369 | // and we didn't get frame content size. |
| 370 | size := len(input) * 2 |
| 371 | // Cap to 1 MB. |
| 372 | if size > 1<<20 { |
| 373 | size = 1 << 20 |
| 374 | } |
| 375 | if uint64(size) > d.o.maxDecodedSize { |
| 376 | size = int(d.o.maxDecodedSize) |
| 377 | } |
| 378 | dst = make([]byte, 0, size) |
| 379 | } |
| 380 | |
| 381 | dst, err = frame.runDecoder(dst, block) |
| 382 | if err != nil { |
| 383 | return dst, err |
| 384 | } |
| 385 | if len(frame.bBuf) == 0 { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 386 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 387 | println("frame dbuf empty") |
| 388 | } |
| 389 | break |
| 390 | } |
| 391 | } |
| 392 | return dst, nil |
| 393 | } |
| 394 | |
| 395 | // nextBlock returns the next block. |
| 396 | // If an error occurs d.err will be set. |
| 397 | // Optionally the function can block for new output. |
| 398 | // If non-blocking mode is used the returned boolean will be false |
| 399 | // if no data was available without blocking. |
| 400 | func (d *Decoder) nextBlock(blocking bool) (ok bool) { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 401 | if d.current.err != nil { |
| 402 | // Keep error state. |
| 403 | return false |
| 404 | } |
| 405 | d.current.b = d.current.b[:0] |
| 406 | |
| 407 | // SYNC: |
| 408 | if d.syncStream.enabled { |
| 409 | if !blocking { |
| 410 | return false |
| 411 | } |
| 412 | ok = d.nextBlockSync() |
| 413 | if !ok { |
| 414 | d.stashDecoder() |
| 415 | } |
| 416 | return ok |
| 417 | } |
| 418 | |
| 419 | //ASYNC: |
| 420 | d.stashDecoder() |
| 421 | if blocking { |
| 422 | d.current.decodeOutput, ok = <-d.current.output |
| 423 | } else { |
| 424 | select { |
| 425 | case d.current.decodeOutput, ok = <-d.current.output: |
| 426 | default: |
| 427 | return false |
| 428 | } |
| 429 | } |
| 430 | if !ok { |
| 431 | // This should not happen, so signal error state... |
| 432 | d.current.err = io.ErrUnexpectedEOF |
| 433 | return false |
| 434 | } |
| 435 | next := d.current.decodeOutput |
| 436 | if next.d != nil && next.d.async.newHist != nil { |
| 437 | d.current.crc.Reset() |
| 438 | } |
| 439 | if debugDecoder { |
| 440 | var tmp [4]byte |
| 441 | binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b))) |
| 442 | println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp) |
| 443 | } |
| 444 | |
| 445 | if !d.o.ignoreChecksum && len(next.b) > 0 { |
| 446 | n, err := d.current.crc.Write(next.b) |
| 447 | if err == nil { |
| 448 | if n != len(next.b) { |
| 449 | d.current.err = io.ErrShortWrite |
| 450 | } |
| 451 | } |
| 452 | } |
| 453 | if next.err == nil && next.d != nil && len(next.d.checkCRC) != 0 { |
| 454 | got := d.current.crc.Sum64() |
| 455 | var tmp [4]byte |
| 456 | binary.LittleEndian.PutUint32(tmp[:], uint32(got)) |
| 457 | if !d.o.ignoreChecksum && !bytes.Equal(tmp[:], next.d.checkCRC) { |
| 458 | if debugDecoder { |
| 459 | println("CRC Check Failed:", tmp[:], " (got) !=", next.d.checkCRC, "(on stream)") |
| 460 | } |
| 461 | d.current.err = ErrCRCMismatch |
| 462 | } else { |
| 463 | if debugDecoder { |
| 464 | println("CRC ok", tmp[:]) |
| 465 | } |
| 466 | } |
| 467 | } |
| 468 | |
| 469 | return true |
| 470 | } |
| 471 | |
| 472 | func (d *Decoder) nextBlockSync() (ok bool) { |
| 473 | if d.current.d == nil { |
| 474 | d.current.d = <-d.decoders |
| 475 | } |
| 476 | for len(d.current.b) == 0 { |
| 477 | if !d.syncStream.inFrame { |
| 478 | d.frame.history.reset() |
| 479 | d.current.err = d.frame.reset(&d.syncStream.br) |
| 480 | if d.current.err != nil { |
| 481 | return false |
| 482 | } |
| 483 | if d.frame.DictionaryID != nil { |
| 484 | dict, ok := d.dicts[*d.frame.DictionaryID] |
| 485 | if !ok { |
| 486 | d.current.err = ErrUnknownDictionary |
| 487 | return false |
| 488 | } else { |
| 489 | d.frame.history.setDict(&dict) |
| 490 | } |
| 491 | } |
| 492 | if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize { |
| 493 | d.current.err = ErrDecoderSizeExceeded |
| 494 | return false |
| 495 | } |
| 496 | |
| 497 | d.syncStream.decodedFrame = 0 |
| 498 | d.syncStream.inFrame = true |
| 499 | } |
| 500 | d.current.err = d.frame.next(d.current.d) |
| 501 | if d.current.err != nil { |
| 502 | return false |
| 503 | } |
| 504 | d.frame.history.ensureBlock() |
| 505 | if debugDecoder { |
| 506 | println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame) |
| 507 | } |
| 508 | histBefore := len(d.frame.history.b) |
| 509 | d.current.err = d.current.d.decodeBuf(&d.frame.history) |
| 510 | |
| 511 | if d.current.err != nil { |
| 512 | println("error after:", d.current.err) |
| 513 | return false |
| 514 | } |
| 515 | d.current.b = d.frame.history.b[histBefore:] |
| 516 | if debugDecoder { |
| 517 | println("history after:", len(d.frame.history.b)) |
| 518 | } |
| 519 | |
| 520 | // Check frame size (before CRC) |
| 521 | d.syncStream.decodedFrame += uint64(len(d.current.b)) |
| 522 | if d.syncStream.decodedFrame > d.frame.FrameContentSize { |
| 523 | if debugDecoder { |
| 524 | printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize) |
| 525 | } |
| 526 | d.current.err = ErrFrameSizeExceeded |
| 527 | return false |
| 528 | } |
| 529 | |
| 530 | // Check FCS |
| 531 | if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize { |
| 532 | if debugDecoder { |
| 533 | printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize) |
| 534 | } |
| 535 | d.current.err = ErrFrameSizeMismatch |
| 536 | return false |
| 537 | } |
| 538 | |
| 539 | // Update/Check CRC |
| 540 | if d.frame.HasCheckSum { |
| 541 | if !d.o.ignoreChecksum { |
| 542 | d.frame.crc.Write(d.current.b) |
| 543 | } |
| 544 | if d.current.d.Last { |
| 545 | if !d.o.ignoreChecksum { |
| 546 | d.current.err = d.frame.checkCRC() |
| 547 | } else { |
| 548 | d.current.err = d.frame.consumeCRC() |
| 549 | } |
| 550 | if d.current.err != nil { |
| 551 | println("CRC error:", d.current.err) |
| 552 | return false |
| 553 | } |
| 554 | } |
| 555 | } |
| 556 | d.syncStream.inFrame = !d.current.d.Last |
| 557 | } |
| 558 | return true |
| 559 | } |
| 560 | |
| 561 | func (d *Decoder) stashDecoder() { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 562 | if d.current.d != nil { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 563 | if debugDecoder { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 564 | printf("re-adding current decoder %p", d.current.d) |
| 565 | } |
| 566 | d.decoders <- d.current.d |
| 567 | d.current.d = nil |
| 568 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 569 | } |
| 570 | |
| 571 | // Close will release all resources. |
| 572 | // It is NOT possible to reuse the decoder after this. |
| 573 | func (d *Decoder) Close() { |
| 574 | if d.current.err == ErrDecoderClosed { |
| 575 | return |
| 576 | } |
| 577 | d.drainOutput() |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 578 | if d.current.cancel != nil { |
| 579 | d.current.cancel() |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 580 | d.streamWg.Wait() |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 581 | d.current.cancel = nil |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 582 | } |
| 583 | if d.decoders != nil { |
| 584 | close(d.decoders) |
| 585 | for dec := range d.decoders { |
| 586 | dec.Close() |
| 587 | } |
| 588 | d.decoders = nil |
| 589 | } |
| 590 | if d.current.d != nil { |
| 591 | d.current.d.Close() |
| 592 | d.current.d = nil |
| 593 | } |
| 594 | d.current.err = ErrDecoderClosed |
| 595 | } |
| 596 | |
| 597 | // IOReadCloser returns the decoder as an io.ReadCloser for convenience. |
| 598 | // Any changes to the decoder will be reflected, so the returned ReadCloser |
| 599 | // can be reused along with the decoder. |
| 600 | // io.WriterTo is also supported by the returned ReadCloser. |
| 601 | func (d *Decoder) IOReadCloser() io.ReadCloser { |
| 602 | return closeWrapper{d: d} |
| 603 | } |
| 604 | |
| 605 | // closeWrapper wraps a function call as a closer. |
| 606 | type closeWrapper struct { |
| 607 | d *Decoder |
| 608 | } |
| 609 | |
| 610 | // WriteTo forwards WriteTo calls to the decoder. |
| 611 | func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) { |
| 612 | return c.d.WriteTo(w) |
| 613 | } |
| 614 | |
| 615 | // Read forwards read calls to the decoder. |
| 616 | func (c closeWrapper) Read(p []byte) (n int, err error) { |
| 617 | return c.d.Read(p) |
| 618 | } |
| 619 | |
| 620 | // Close closes the decoder. |
| 621 | func (c closeWrapper) Close() error { |
| 622 | c.d.Close() |
| 623 | return nil |
| 624 | } |
| 625 | |
| 626 | type decodeOutput struct { |
| 627 | d *blockDec |
| 628 | b []byte |
| 629 | err error |
| 630 | } |
| 631 | |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 632 | func (d *Decoder) startSyncDecoder(r io.Reader) error { |
| 633 | d.frame.history.reset() |
| 634 | d.syncStream.br = readerWrapper{r: r} |
| 635 | d.syncStream.inFrame = false |
| 636 | d.syncStream.enabled = true |
| 637 | d.syncStream.decodedFrame = 0 |
| 638 | return nil |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 639 | } |
| 640 | |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 641 | // Create Decoder: |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 642 | // ASYNC: |
| 643 | // Spawn 3 go routines. |
| 644 | // 0: Read frames and decode block literals. |
| 645 | // 1: Decode sequences. |
| 646 | // 2: Execute sequences, send to output. |
| 647 | func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 648 | defer d.streamWg.Done() |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 649 | br := readerWrapper{r: r} |
| 650 | |
| 651 | var seqDecode = make(chan *blockDec, d.o.concurrent) |
| 652 | var seqExecute = make(chan *blockDec, d.o.concurrent) |
| 653 | |
| 654 | // Async 1: Decode sequences... |
| 655 | go func() { |
| 656 | var hist history |
| 657 | var hasErr bool |
| 658 | |
| 659 | for block := range seqDecode { |
| 660 | if hasErr { |
| 661 | if block != nil { |
| 662 | seqExecute <- block |
| 663 | } |
| 664 | continue |
| 665 | } |
| 666 | if block.async.newHist != nil { |
| 667 | if debugDecoder { |
| 668 | println("Async 1: new history, recent:", block.async.newHist.recentOffsets) |
| 669 | } |
| 670 | hist.decoders = block.async.newHist.decoders |
| 671 | hist.recentOffsets = block.async.newHist.recentOffsets |
| 672 | hist.windowSize = block.async.newHist.windowSize |
| 673 | if block.async.newHist.dict != nil { |
| 674 | hist.setDict(block.async.newHist.dict) |
| 675 | } |
| 676 | } |
| 677 | if block.err != nil || block.Type != blockTypeCompressed { |
| 678 | hasErr = block.err != nil |
| 679 | seqExecute <- block |
| 680 | continue |
| 681 | } |
| 682 | |
| 683 | hist.decoders.literals = block.async.literals |
| 684 | block.err = block.prepareSequences(block.async.seqData, &hist) |
| 685 | if debugDecoder && block.err != nil { |
| 686 | println("prepareSequences returned:", block.err) |
| 687 | } |
| 688 | hasErr = block.err != nil |
| 689 | if block.err == nil { |
| 690 | block.err = block.decodeSequences(&hist) |
| 691 | if debugDecoder && block.err != nil { |
| 692 | println("decodeSequences returned:", block.err) |
| 693 | } |
| 694 | hasErr = block.err != nil |
| 695 | // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs] |
| 696 | block.async.seqSize = hist.decoders.seqSize |
| 697 | } |
| 698 | seqExecute <- block |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 699 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 700 | close(seqExecute) |
| 701 | }() |
| 702 | |
| 703 | var wg sync.WaitGroup |
| 704 | wg.Add(1) |
| 705 | |
| 706 | // Async 3: Execute sequences... |
| 707 | frameHistCache := d.frame.history.b |
| 708 | go func() { |
| 709 | var hist history |
| 710 | var decodedFrame uint64 |
| 711 | var fcs uint64 |
| 712 | var hasErr bool |
| 713 | for block := range seqExecute { |
| 714 | out := decodeOutput{err: block.err, d: block} |
| 715 | if block.err != nil || hasErr { |
| 716 | hasErr = true |
| 717 | output <- out |
| 718 | continue |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 719 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 720 | if block.async.newHist != nil { |
| 721 | if debugDecoder { |
| 722 | println("Async 2: new history") |
| 723 | } |
| 724 | hist.windowSize = block.async.newHist.windowSize |
| 725 | hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer |
| 726 | if block.async.newHist.dict != nil { |
| 727 | hist.setDict(block.async.newHist.dict) |
| 728 | } |
| 729 | |
| 730 | if cap(hist.b) < hist.allocFrameBuffer { |
| 731 | if cap(frameHistCache) >= hist.allocFrameBuffer { |
| 732 | hist.b = frameHistCache |
| 733 | } else { |
| 734 | hist.b = make([]byte, 0, hist.allocFrameBuffer) |
| 735 | println("Alloc history sized", hist.allocFrameBuffer) |
| 736 | } |
| 737 | } |
| 738 | hist.b = hist.b[:0] |
| 739 | fcs = block.async.fcs |
| 740 | decodedFrame = 0 |
| 741 | } |
| 742 | do := decodeOutput{err: block.err, d: block} |
| 743 | switch block.Type { |
| 744 | case blockTypeRLE: |
| 745 | if debugDecoder { |
| 746 | println("add rle block length:", block.RLESize) |
| 747 | } |
| 748 | |
| 749 | if cap(block.dst) < int(block.RLESize) { |
| 750 | if block.lowMem { |
| 751 | block.dst = make([]byte, block.RLESize) |
| 752 | } else { |
| 753 | block.dst = make([]byte, maxBlockSize) |
| 754 | } |
| 755 | } |
| 756 | block.dst = block.dst[:block.RLESize] |
| 757 | v := block.data[0] |
| 758 | for i := range block.dst { |
| 759 | block.dst[i] = v |
| 760 | } |
| 761 | hist.append(block.dst) |
| 762 | do.b = block.dst |
| 763 | case blockTypeRaw: |
| 764 | if debugDecoder { |
| 765 | println("add raw block length:", len(block.data)) |
| 766 | } |
| 767 | hist.append(block.data) |
| 768 | do.b = block.data |
| 769 | case blockTypeCompressed: |
| 770 | if debugDecoder { |
| 771 | println("execute with history length:", len(hist.b), "window:", hist.windowSize) |
| 772 | } |
| 773 | hist.decoders.seqSize = block.async.seqSize |
| 774 | hist.decoders.literals = block.async.literals |
| 775 | do.err = block.executeSequences(&hist) |
| 776 | hasErr = do.err != nil |
| 777 | if debugDecoder && hasErr { |
| 778 | println("executeSequences returned:", do.err) |
| 779 | } |
| 780 | do.b = block.dst |
| 781 | } |
| 782 | if !hasErr { |
| 783 | decodedFrame += uint64(len(do.b)) |
| 784 | if decodedFrame > fcs { |
| 785 | println("fcs exceeded", block.Last, fcs, decodedFrame) |
| 786 | do.err = ErrFrameSizeExceeded |
| 787 | hasErr = true |
| 788 | } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs { |
| 789 | do.err = ErrFrameSizeMismatch |
| 790 | hasErr = true |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 791 | } else { |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 792 | if debugDecoder { |
| 793 | println("fcs ok", block.Last, fcs, decodedFrame) |
| 794 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 795 | } |
| 796 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 797 | output <- do |
| 798 | } |
| 799 | close(output) |
| 800 | frameHistCache = hist.b |
| 801 | wg.Done() |
| 802 | if debugDecoder { |
| 803 | println("decoder goroutines finished") |
| 804 | } |
| 805 | }() |
| 806 | |
| 807 | decodeStream: |
| 808 | for { |
| 809 | var hist history |
| 810 | var hasErr bool |
| 811 | |
| 812 | decodeBlock := func(block *blockDec) { |
| 813 | if hasErr { |
| 814 | if block != nil { |
| 815 | seqDecode <- block |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 816 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 817 | return |
| 818 | } |
| 819 | if block.err != nil || block.Type != blockTypeCompressed { |
| 820 | hasErr = block.err != nil |
| 821 | seqDecode <- block |
| 822 | return |
| 823 | } |
| 824 | |
| 825 | remain, err := block.decodeLiterals(block.data, &hist) |
| 826 | block.err = err |
| 827 | hasErr = block.err != nil |
| 828 | if err == nil { |
| 829 | block.async.literals = hist.decoders.literals |
| 830 | block.async.seqData = remain |
| 831 | } else if debugDecoder { |
| 832 | println("decodeLiterals error:", err) |
| 833 | } |
| 834 | seqDecode <- block |
| 835 | } |
| 836 | frame := d.frame |
| 837 | if debugDecoder { |
| 838 | println("New frame...") |
| 839 | } |
| 840 | var historySent bool |
| 841 | frame.history.reset() |
| 842 | err := frame.reset(&br) |
| 843 | if debugDecoder && err != nil { |
| 844 | println("Frame decoder returned", err) |
| 845 | } |
| 846 | if err == nil && frame.DictionaryID != nil { |
| 847 | dict, ok := d.dicts[*frame.DictionaryID] |
| 848 | if !ok { |
| 849 | err = ErrUnknownDictionary |
| 850 | } else { |
| 851 | frame.history.setDict(&dict) |
| 852 | } |
| 853 | } |
| 854 | if err == nil && d.frame.WindowSize > d.o.maxWindowSize { |
| 855 | err = ErrDecoderSizeExceeded |
| 856 | } |
| 857 | if err != nil { |
| 858 | select { |
| 859 | case <-ctx.Done(): |
| 860 | case dec := <-d.decoders: |
| 861 | dec.sendErr(err) |
| 862 | decodeBlock(dec) |
| 863 | } |
| 864 | break decodeStream |
| 865 | } |
| 866 | |
| 867 | // Go through all blocks of the frame. |
| 868 | for { |
| 869 | var dec *blockDec |
| 870 | select { |
| 871 | case <-ctx.Done(): |
| 872 | break decodeStream |
| 873 | case dec = <-d.decoders: |
| 874 | // Once we have a decoder, we MUST return it. |
| 875 | } |
| 876 | err := frame.next(dec) |
| 877 | if !historySent { |
| 878 | h := frame.history |
| 879 | if debugDecoder { |
| 880 | println("Alloc History:", h.allocFrameBuffer) |
| 881 | } |
| 882 | hist.reset() |
| 883 | if h.dict != nil { |
| 884 | hist.setDict(h.dict) |
| 885 | } |
| 886 | dec.async.newHist = &h |
| 887 | dec.async.fcs = frame.FrameContentSize |
| 888 | historySent = true |
| 889 | } else { |
| 890 | dec.async.newHist = nil |
| 891 | } |
| 892 | if debugDecoder && err != nil { |
| 893 | println("next block returned error:", err) |
| 894 | } |
| 895 | dec.err = err |
| 896 | dec.checkCRC = nil |
| 897 | if dec.Last && frame.HasCheckSum && err == nil { |
| 898 | crc, err := frame.rawInput.readSmall(4) |
| 899 | if err != nil { |
| 900 | println("CRC missing?", err) |
| 901 | dec.err = err |
| 902 | } |
| 903 | var tmp [4]byte |
| 904 | copy(tmp[:], crc) |
| 905 | dec.checkCRC = tmp[:] |
| 906 | if debugDecoder { |
| 907 | println("found crc to check:", dec.checkCRC) |
| 908 | } |
| 909 | } |
| 910 | err = dec.err |
| 911 | last := dec.Last |
| 912 | decodeBlock(dec) |
| 913 | if err != nil { |
| 914 | break decodeStream |
| 915 | } |
| 916 | if last { |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 917 | break |
| 918 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 919 | } |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 920 | } |
Akash Reddy Kankanala | c28f0e2 | 2025-06-16 11:00:55 +0530 | [diff] [blame^] | 921 | close(seqDecode) |
| 922 | wg.Wait() |
| 923 | d.frame.history.b = frameHistCache |
khenaidoo | 7d3c558 | 2021-08-11 18:09:44 -0400 | [diff] [blame] | 924 | } |