[VOL-5292] Implementation for fetching the GEM port history Data from the ONT
Change-Id: I4cf22555cbd13bcd5e49e620c8aa8b67cbd2891c
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/github.com/klauspost/compress/zstd/framedec.go b/vendor/github.com/klauspost/compress/zstd/framedec.go
index 693c5f0..9568a4b 100644
--- a/vendor/github.com/klauspost/compress/zstd/framedec.go
+++ b/vendor/github.com/klauspost/compress/zstd/framedec.go
@@ -8,27 +8,17 @@
"bytes"
"encoding/hex"
"errors"
- "hash"
"io"
- "sync"
"github.com/klauspost/compress/zstd/internal/xxhash"
)
type frameDec struct {
- o decoderOptions
- crc hash.Hash64
- offset int64
+ o decoderOptions
+ crc *xxhash.Digest
WindowSize uint64
- // maxWindowSize is the maximum windows size to support.
- // should never be bigger than max-int.
- maxWindowSize uint64
-
- // In order queue of blocks being decoded.
- decoding chan *blockDec
-
// Frame history passed between blocks
history history
@@ -38,20 +28,18 @@
bBuf byteBuf
FrameContentSize uint64
- frameDone sync.WaitGroup
DictionaryID *uint32
HasCheckSum bool
SingleSegment bool
-
- // asyncRunning indicates whether the async routine processes input on 'decoding'.
- asyncRunningMu sync.Mutex
- asyncRunning bool
}
const (
- // The minimum Window_Size is 1 KB.
+ // MinWindowSize is the minimum Window Size, which is 1 KB.
MinWindowSize = 1 << 10
+
+ // MaxWindowSize is the maximum encoder window size
+ // and the default decoder maximum window size.
MaxWindowSize = 1 << 29
)
@@ -61,12 +49,11 @@
)
func newFrameDec(o decoderOptions) *frameDec {
- d := frameDec{
- o: o,
- maxWindowSize: MaxWindowSize,
+ if o.maxWindowSize > o.maxDecodedSize {
+ o.maxWindowSize = o.maxDecodedSize
}
- if d.maxWindowSize > o.maxDecodedSize {
- d.maxWindowSize = o.maxDecodedSize
+ d := frameDec{
+ o: o,
}
return &d
}
@@ -78,44 +65,68 @@
func (d *frameDec) reset(br byteBuffer) error {
d.HasCheckSum = false
d.WindowSize = 0
- var b []byte
+ var signature [4]byte
for {
- b = br.readSmall(4)
- if b == nil {
+ var err error
+ // Check if we can read more...
+ b, err := br.readSmall(1)
+ switch err {
+ case io.EOF, io.ErrUnexpectedEOF:
return io.EOF
+ default:
+ return err
+ case nil:
+ signature[0] = b[0]
}
- if !bytes.Equal(b[1:4], skippableFrameMagic) || b[0]&0xf0 != 0x50 {
- if debug {
- println("Not skippable", hex.EncodeToString(b), hex.EncodeToString(skippableFrameMagic))
+ // Read the rest, don't allow io.ErrUnexpectedEOF
+ b, err = br.readSmall(3)
+ switch err {
+ case io.EOF:
+ return io.EOF
+ default:
+ return err
+ case nil:
+ copy(signature[1:], b)
+ }
+
+ if !bytes.Equal(signature[1:4], skippableFrameMagic) || signature[0]&0xf0 != 0x50 {
+ if debugDecoder {
+ println("Not skippable", hex.EncodeToString(signature[:]), hex.EncodeToString(skippableFrameMagic))
}
// Break if not skippable frame.
break
}
// Read size to skip
- b = br.readSmall(4)
- if b == nil {
- println("Reading Frame Size EOF")
- return io.ErrUnexpectedEOF
+ b, err = br.readSmall(4)
+ if err != nil {
+ if debugDecoder {
+ println("Reading Frame Size", err)
+ }
+ return err
}
n := uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
println("Skipping frame with", n, "bytes.")
- err := br.skipN(int(n))
+ err = br.skipN(int64(n))
if err != nil {
- if debug {
+ if debugDecoder {
println("Reading discarded frame", err)
}
return err
}
}
- if !bytes.Equal(b, frameMagic) {
- println("Got magic numbers: ", b, "want:", frameMagic)
+ if !bytes.Equal(signature[:], frameMagic) {
+ if debugDecoder {
+ println("Got magic numbers: ", signature, "want:", frameMagic)
+ }
return ErrMagicMismatch
}
// Read Frame_Header_Descriptor
fhd, err := br.readByte()
if err != nil {
- println("Reading Frame_Header_Descriptor", err)
+ if debugDecoder {
+ println("Reading Frame_Header_Descriptor", err)
+ }
return err
}
d.SingleSegment = fhd&(1<<5) != 0
@@ -130,7 +141,9 @@
if !d.SingleSegment {
wd, err := br.readByte()
if err != nil {
- println("Reading Window_Descriptor", err)
+ if debugDecoder {
+ println("Reading Window_Descriptor", err)
+ }
return err
}
printf("raw: %x, mantissa: %d, exponent: %d\n", wd, wd&7, wd>>3)
@@ -147,12 +160,11 @@
if size == 3 {
size = 4
}
- b = br.readSmall(int(size))
- if b == nil {
- if debug {
- println("Reading Dictionary_ID", io.ErrUnexpectedEOF)
- }
- return io.ErrUnexpectedEOF
+
+ b, err := br.readSmall(int(size))
+ if err != nil {
+ println("Reading Dictionary_ID", err)
+ return err
}
var id uint32
switch size {
@@ -163,7 +175,7 @@
case 4:
id = uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
}
- if debug {
+ if debugDecoder {
println("Dict size", size, "ID:", id)
}
if id > 0 {
@@ -185,12 +197,12 @@
default:
fcsSize = 1 << v
}
- d.FrameContentSize = 0
+ d.FrameContentSize = fcsUnknown
if fcsSize > 0 {
- b := br.readSmall(fcsSize)
- if b == nil {
- println("Reading Frame content", io.ErrUnexpectedEOF)
- return io.ErrUnexpectedEOF
+ b, err := br.readSmall(fcsSize)
+ if err != nil {
+ println("Reading Frame content", err)
+ return err
}
switch fcsSize {
case 1:
@@ -205,10 +217,11 @@
d2 := uint32(b[4]) | (uint32(b[5]) << 8) | (uint32(b[6]) << 16) | (uint32(b[7]) << 24)
d.FrameContentSize = uint64(d1) | (uint64(d2) << 32)
}
- if debug {
- println("field size bits:", v, "fcsSize:", fcsSize, "FrameContentSize:", d.FrameContentSize, hex.EncodeToString(b[:fcsSize]), "singleseg:", d.SingleSegment, "window:", d.WindowSize)
+ if debugDecoder {
+ println("Read FCS:", d.FrameContentSize)
}
}
+
// Move this to shared.
d.HasCheckSum = fhd&(1<<2) != 0
if d.HasCheckSum {
@@ -218,29 +231,47 @@
d.crc.Reset()
}
+ if d.WindowSize > d.o.maxWindowSize {
+ if debugDecoder {
+ printf("window size %d > max %d\n", d.WindowSize, d.o.maxWindowSize)
+ }
+ return ErrWindowSizeExceeded
+ }
+
if d.WindowSize == 0 && d.SingleSegment {
// We may not need window in this case.
d.WindowSize = d.FrameContentSize
if d.WindowSize < MinWindowSize {
d.WindowSize = MinWindowSize
}
+ if d.WindowSize > d.o.maxDecodedSize {
+ if debugDecoder {
+ printf("window size %d > max %d\n", d.WindowSize, d.o.maxWindowSize)
+ }
+ return ErrDecoderSizeExceeded
+ }
}
- if d.WindowSize > d.maxWindowSize {
- printf("window size %d > max %d\n", d.WindowSize, d.maxWindowSize)
- return ErrWindowSizeExceeded
- }
// The minimum Window_Size is 1 KB.
if d.WindowSize < MinWindowSize {
- println("got window size: ", d.WindowSize)
+ if debugDecoder {
+ println("got window size: ", d.WindowSize)
+ }
return ErrWindowSizeTooSmall
}
d.history.windowSize = int(d.WindowSize)
- if d.o.lowMem && d.history.windowSize < maxBlockSize {
- d.history.maxSize = d.history.windowSize * 2
+ if !d.o.lowMem || d.history.windowSize < maxBlockSize {
+ // Alloc 2x window size if not low-mem, or very small window size.
+ d.history.allocFrameBuffer = d.history.windowSize * 2
} else {
- d.history.maxSize = d.history.windowSize + maxBlockSize
+ // Alloc with one additional block
+ d.history.allocFrameBuffer = d.history.windowSize + maxBlockSize
}
+
+ if debugDecoder {
+ println("Frame: Dict:", d.DictionaryID, "FrameContentSize:", d.FrameContentSize, "singleseg:", d.SingleSegment, "window:", d.WindowSize, "crc:", d.HasCheckSum)
+ }
+
// history contains input - maybe we do something
d.rawInput = br
return nil
@@ -248,56 +279,37 @@
// next will start decoding the next block from stream.
func (d *frameDec) next(block *blockDec) error {
- if debug {
- printf("decoding new block %p:%p", block, block.data)
+ if debugDecoder {
+ println("decoding new block")
}
err := block.reset(d.rawInput, d.WindowSize)
if err != nil {
println("block error:", err)
// Signal the frame decoder we have a problem.
- d.sendErr(block, err)
+ block.sendErr(err)
return err
}
- block.input <- struct{}{}
- if debug {
- println("next block:", block)
- }
- d.asyncRunningMu.Lock()
- defer d.asyncRunningMu.Unlock()
- if !d.asyncRunning {
- return nil
- }
- if block.Last {
- // We indicate the frame is done by sending io.EOF
- d.decoding <- block
- return io.EOF
- }
- d.decoding <- block
return nil
}
-// sendEOF will queue an error block on the frame.
-// This will cause the frame decoder to return when it encounters the block.
-// Returns true if the decoder was added.
-func (d *frameDec) sendErr(block *blockDec, err error) bool {
- d.asyncRunningMu.Lock()
- defer d.asyncRunningMu.Unlock()
- if !d.asyncRunning {
- return false
- }
-
- println("sending error", err.Error())
- block.sendErr(err)
- d.decoding <- block
- return true
-}
-
// checkCRC will check the checksum if the frame has one.
// Will return ErrCRCMismatch if crc check failed, otherwise nil.
func (d *frameDec) checkCRC() error {
if !d.HasCheckSum {
return nil
}
+
+ // We can overwrite upper tmp now
+ want, err := d.rawInput.readSmall(4)
+ if err != nil {
+ println("CRC missing?", err)
+ return err
+ }
+
+ if d.o.ignoreChecksum {
+ return nil
+ }
+
var tmp [4]byte
got := d.crc.Sum64()
// Flip to match file order.
@@ -306,142 +318,29 @@
tmp[2] = byte(got >> 16)
tmp[3] = byte(got >> 24)
- // We can overwrite upper tmp now
- want := d.rawInput.readSmall(4)
- if want == nil {
- println("CRC missing?")
- return io.ErrUnexpectedEOF
- }
-
if !bytes.Equal(tmp[:], want) {
- if debug {
+ if debugDecoder {
println("CRC Check Failed:", tmp[:], "!=", want)
}
return ErrCRCMismatch
}
- if debug {
+ if debugDecoder {
println("CRC ok", tmp[:])
}
return nil
}
-func (d *frameDec) initAsync() {
- if !d.o.lowMem && !d.SingleSegment {
- // set max extra size history to 10MB.
- d.history.maxSize = d.history.windowSize + maxBlockSize*5
+// consumeCRC reads the checksum data if the frame has one.
+func (d *frameDec) consumeCRC() error {
+ if d.HasCheckSum {
+ _, err := d.rawInput.readSmall(4)
+ if err != nil {
+ println("CRC missing?", err)
+ return err
+ }
}
- // re-alloc if more than one extra block size.
- if d.o.lowMem && cap(d.history.b) > d.history.maxSize+maxBlockSize {
- d.history.b = make([]byte, 0, d.history.maxSize)
- }
- if cap(d.history.b) < d.history.maxSize {
- d.history.b = make([]byte, 0, d.history.maxSize)
- }
- if cap(d.decoding) < d.o.concurrent {
- d.decoding = make(chan *blockDec, d.o.concurrent)
- }
- if debug {
- h := d.history
- printf("history init. len: %d, cap: %d", len(h.b), cap(h.b))
- }
- d.asyncRunningMu.Lock()
- d.asyncRunning = true
- d.asyncRunningMu.Unlock()
-}
-// startDecoder will start decoding blocks and write them to the writer.
-// The decoder will stop as soon as an error occurs or at end of frame.
-// When the frame has finished decoding the *bufio.Reader
-// containing the remaining input will be sent on frameDec.frameDone.
-func (d *frameDec) startDecoder(output chan decodeOutput) {
- written := int64(0)
-
- defer func() {
- d.asyncRunningMu.Lock()
- d.asyncRunning = false
- d.asyncRunningMu.Unlock()
-
- // Drain the currently decoding.
- d.history.error = true
- flushdone:
- for {
- select {
- case b := <-d.decoding:
- b.history <- &d.history
- output <- <-b.result
- default:
- break flushdone
- }
- }
- println("frame decoder done, signalling done")
- d.frameDone.Done()
- }()
- // Get decoder for first block.
- block := <-d.decoding
- block.history <- &d.history
- for {
- var next *blockDec
- // Get result
- r := <-block.result
- if r.err != nil {
- println("Result contained error", r.err)
- output <- r
- return
- }
- if debug {
- println("got result, from ", d.offset, "to", d.offset+int64(len(r.b)))
- d.offset += int64(len(r.b))
- }
- if !block.Last {
- // Send history to next block
- select {
- case next = <-d.decoding:
- if debug {
- println("Sending ", len(d.history.b), "bytes as history")
- }
- next.history <- &d.history
- default:
- // Wait until we have sent the block, so
- // other decoders can potentially get the decoder.
- next = nil
- }
- }
-
- // Add checksum, async to decoding.
- if d.HasCheckSum {
- n, err := d.crc.Write(r.b)
- if err != nil {
- r.err = err
- if n != len(r.b) {
- r.err = io.ErrShortWrite
- }
- output <- r
- return
- }
- }
- written += int64(len(r.b))
- if d.SingleSegment && uint64(written) > d.FrameContentSize {
- println("runDecoder: single segment and", uint64(written), ">", d.FrameContentSize)
- r.err = ErrFrameSizeExceeded
- output <- r
- return
- }
- if block.Last {
- r.err = d.checkCRC()
- output <- r
- return
- }
- output <- r
- if next == nil {
- // There was no decoder available, we wait for one now that we have sent to the writer.
- if debug {
- println("Sending ", len(d.history.b), " bytes as history")
- }
- next = <-d.decoding
- next.history <- &d.history
- }
- block = next
- }
+ return nil
}
// runDecoder will create a sync decoder that will decode a block of data.
@@ -450,41 +349,67 @@
// We use the history for output to avoid copying it.
d.history.b = dst
+ d.history.ignoreBuffer = len(dst)
// Store input length, so we only check new data.
crcStart := len(dst)
+ d.history.decoders.maxSyncLen = 0
+ if d.FrameContentSize != fcsUnknown {
+ d.history.decoders.maxSyncLen = d.FrameContentSize + uint64(len(dst))
+ if d.history.decoders.maxSyncLen > d.o.maxDecodedSize {
+ return dst, ErrDecoderSizeExceeded
+ }
+ if uint64(cap(dst)) < d.history.decoders.maxSyncLen {
+ // Alloc for output
+ dst2 := make([]byte, len(dst), d.history.decoders.maxSyncLen+compressedBlockOverAlloc)
+ copy(dst2, dst)
+ dst = dst2
+ }
+ }
var err error
for {
err = dec.reset(d.rawInput, d.WindowSize)
if err != nil {
break
}
- if debug {
+ if debugDecoder {
println("next block:", dec)
}
err = dec.decodeBuf(&d.history)
- if err != nil || dec.Last {
+ if err != nil {
break
}
if uint64(len(d.history.b)) > d.o.maxDecodedSize {
err = ErrDecoderSizeExceeded
break
}
- if d.SingleSegment && uint64(len(d.history.b)) > d.o.maxDecodedSize {
- println("runDecoder: single segment and", uint64(len(d.history.b)), ">", d.o.maxDecodedSize)
+ if uint64(len(d.history.b)-crcStart) > d.FrameContentSize {
+ println("runDecoder: FrameContentSize exceeded", uint64(len(d.history.b)-crcStart), ">", d.FrameContentSize)
err = ErrFrameSizeExceeded
break
}
+ if dec.Last {
+ break
+ }
+ if debugDecoder {
+ println("runDecoder: FrameContentSize", uint64(len(d.history.b)-crcStart), "<=", d.FrameContentSize)
+ }
}
dst = d.history.b
if err == nil {
- if d.HasCheckSum {
- var n int
- n, err = d.crc.Write(dst[crcStart:])
- if err == nil {
- if n != len(dst)-crcStart {
- err = io.ErrShortWrite
- } else {
- err = d.checkCRC()
+ if d.FrameContentSize != fcsUnknown && uint64(len(d.history.b)-crcStart) != d.FrameContentSize {
+ err = ErrFrameSizeMismatch
+ } else if d.HasCheckSum {
+ if d.o.ignoreChecksum {
+ err = d.consumeCRC()
+ } else {
+ var n int
+ n, err = d.crc.Write(dst[crcStart:])
+ if err == nil {
+ if n != len(dst)-crcStart {
+ err = io.ErrShortWrite
+ } else {
+ err = d.checkCRC()
+ }
}
}
}