[VOL-5292] Implementation for fetching the GEM port history Data from the ONT

Change-Id: I4cf22555cbd13bcd5e49e620c8aa8b67cbd2891c
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/github.com/klauspost/compress/zstd/framedec.go b/vendor/github.com/klauspost/compress/zstd/framedec.go
index 693c5f0..9568a4b 100644
--- a/vendor/github.com/klauspost/compress/zstd/framedec.go
+++ b/vendor/github.com/klauspost/compress/zstd/framedec.go
@@ -8,27 +8,17 @@
 	"bytes"
 	"encoding/hex"
 	"errors"
-	"hash"
 	"io"
-	"sync"
 
 	"github.com/klauspost/compress/zstd/internal/xxhash"
 )
 
 type frameDec struct {
-	o      decoderOptions
-	crc    hash.Hash64
-	offset int64
+	o   decoderOptions
+	crc *xxhash.Digest
 
 	WindowSize uint64
 
-	// maxWindowSize is the maximum windows size to support.
-	// should never be bigger than max-int.
-	maxWindowSize uint64
-
-	// In order queue of blocks being decoded.
-	decoding chan *blockDec
-
 	// Frame history passed between blocks
 	history history
 
@@ -38,20 +28,18 @@
 	bBuf byteBuf
 
 	FrameContentSize uint64
-	frameDone        sync.WaitGroup
 
 	DictionaryID  *uint32
 	HasCheckSum   bool
 	SingleSegment bool
-
-	// asyncRunning indicates whether the async routine processes input on 'decoding'.
-	asyncRunningMu sync.Mutex
-	asyncRunning   bool
 }
 
 const (
-	// The minimum Window_Size is 1 KB.
+	// MinWindowSize is the minimum Window Size, which is 1 KB.
 	MinWindowSize = 1 << 10
+
+	// MaxWindowSize is the maximum encoder window size
+	// and the default decoder maximum window size.
 	MaxWindowSize = 1 << 29
 )
 
@@ -61,12 +49,11 @@
 )
 
 func newFrameDec(o decoderOptions) *frameDec {
-	d := frameDec{
-		o:             o,
-		maxWindowSize: MaxWindowSize,
+	if o.maxWindowSize > o.maxDecodedSize {
+		o.maxWindowSize = o.maxDecodedSize
 	}
-	if d.maxWindowSize > o.maxDecodedSize {
-		d.maxWindowSize = o.maxDecodedSize
+	d := frameDec{
+		o: o,
 	}
 	return &d
 }
@@ -78,44 +65,68 @@
 func (d *frameDec) reset(br byteBuffer) error {
 	d.HasCheckSum = false
 	d.WindowSize = 0
-	var b []byte
+	var signature [4]byte
 	for {
-		b = br.readSmall(4)
-		if b == nil {
+		var err error
+		// Check if we can read more...
+		b, err := br.readSmall(1)
+		switch err {
+		case io.EOF, io.ErrUnexpectedEOF:
 			return io.EOF
+		default:
+			return err
+		case nil:
+			signature[0] = b[0]
 		}
-		if !bytes.Equal(b[1:4], skippableFrameMagic) || b[0]&0xf0 != 0x50 {
-			if debug {
-				println("Not skippable", hex.EncodeToString(b), hex.EncodeToString(skippableFrameMagic))
+		// Read the rest, don't allow io.ErrUnexpectedEOF
+		b, err = br.readSmall(3)
+		switch err {
+		case io.EOF:
+			return io.EOF
+		default:
+			return err
+		case nil:
+			copy(signature[1:], b)
+		}
+
+		if !bytes.Equal(signature[1:4], skippableFrameMagic) || signature[0]&0xf0 != 0x50 {
+			if debugDecoder {
+				println("Not skippable", hex.EncodeToString(signature[:]), hex.EncodeToString(skippableFrameMagic))
 			}
 			// Break if not skippable frame.
 			break
 		}
 		// Read size to skip
-		b = br.readSmall(4)
-		if b == nil {
-			println("Reading Frame Size EOF")
-			return io.ErrUnexpectedEOF
+		b, err = br.readSmall(4)
+		if err != nil {
+			if debugDecoder {
+				println("Reading Frame Size", err)
+			}
+			return err
 		}
 		n := uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
 		println("Skipping frame with", n, "bytes.")
-		err := br.skipN(int(n))
+		err = br.skipN(int64(n))
 		if err != nil {
-			if debug {
+			if debugDecoder {
 				println("Reading discarded frame", err)
 			}
 			return err
 		}
 	}
-	if !bytes.Equal(b, frameMagic) {
-		println("Got magic numbers: ", b, "want:", frameMagic)
+	if !bytes.Equal(signature[:], frameMagic) {
+		if debugDecoder {
+			println("Got magic numbers: ", signature, "want:", frameMagic)
+		}
 		return ErrMagicMismatch
 	}
 
 	// Read Frame_Header_Descriptor
 	fhd, err := br.readByte()
 	if err != nil {
-		println("Reading Frame_Header_Descriptor", err)
+		if debugDecoder {
+			println("Reading Frame_Header_Descriptor", err)
+		}
 		return err
 	}
 	d.SingleSegment = fhd&(1<<5) != 0
@@ -130,7 +141,9 @@
 	if !d.SingleSegment {
 		wd, err := br.readByte()
 		if err != nil {
-			println("Reading Window_Descriptor", err)
+			if debugDecoder {
+				println("Reading Window_Descriptor", err)
+			}
 			return err
 		}
 		printf("raw: %x, mantissa: %d, exponent: %d\n", wd, wd&7, wd>>3)
@@ -147,12 +160,11 @@
 		if size == 3 {
 			size = 4
 		}
-		b = br.readSmall(int(size))
-		if b == nil {
-			if debug {
-				println("Reading Dictionary_ID", io.ErrUnexpectedEOF)
-			}
-			return io.ErrUnexpectedEOF
+
+		b, err := br.readSmall(int(size))
+		if err != nil {
+			println("Reading Dictionary_ID", err)
+			return err
 		}
 		var id uint32
 		switch size {
@@ -163,7 +175,7 @@
 		case 4:
 			id = uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)
 		}
-		if debug {
+		if debugDecoder {
 			println("Dict size", size, "ID:", id)
 		}
 		if id > 0 {
@@ -185,12 +197,12 @@
 	default:
 		fcsSize = 1 << v
 	}
-	d.FrameContentSize = 0
+	d.FrameContentSize = fcsUnknown
 	if fcsSize > 0 {
-		b := br.readSmall(fcsSize)
-		if b == nil {
-			println("Reading Frame content", io.ErrUnexpectedEOF)
-			return io.ErrUnexpectedEOF
+		b, err := br.readSmall(fcsSize)
+		if err != nil {
+			println("Reading Frame content", err)
+			return err
 		}
 		switch fcsSize {
 		case 1:
@@ -205,10 +217,11 @@
 			d2 := uint32(b[4]) | (uint32(b[5]) << 8) | (uint32(b[6]) << 16) | (uint32(b[7]) << 24)
 			d.FrameContentSize = uint64(d1) | (uint64(d2) << 32)
 		}
-		if debug {
-			println("field size bits:", v, "fcsSize:", fcsSize, "FrameContentSize:", d.FrameContentSize, hex.EncodeToString(b[:fcsSize]), "singleseg:", d.SingleSegment, "window:", d.WindowSize)
+		if debugDecoder {
+			println("Read FCS:", d.FrameContentSize)
 		}
 	}
+
 	// Move this to shared.
 	d.HasCheckSum = fhd&(1<<2) != 0
 	if d.HasCheckSum {
@@ -218,29 +231,47 @@
 		d.crc.Reset()
 	}
 
+	if d.WindowSize > d.o.maxWindowSize {
+		if debugDecoder {
+			printf("window size %d > max %d\n", d.WindowSize, d.o.maxWindowSize)
+		}
+		return ErrWindowSizeExceeded
+	}
+
 	if d.WindowSize == 0 && d.SingleSegment {
 		// We may not need window in this case.
 		d.WindowSize = d.FrameContentSize
 		if d.WindowSize < MinWindowSize {
 			d.WindowSize = MinWindowSize
 		}
+		if d.WindowSize > d.o.maxDecodedSize {
+			if debugDecoder {
+				printf("window size %d > max %d\n", d.WindowSize, d.o.maxWindowSize)
+			}
+			return ErrDecoderSizeExceeded
+		}
 	}
 
-	if d.WindowSize > d.maxWindowSize {
-		printf("window size %d > max %d\n", d.WindowSize, d.maxWindowSize)
-		return ErrWindowSizeExceeded
-	}
 	// The minimum Window_Size is 1 KB.
 	if d.WindowSize < MinWindowSize {
-		println("got window size: ", d.WindowSize)
+		if debugDecoder {
+			println("got window size: ", d.WindowSize)
+		}
 		return ErrWindowSizeTooSmall
 	}
 	d.history.windowSize = int(d.WindowSize)
-	if d.o.lowMem && d.history.windowSize < maxBlockSize {
-		d.history.maxSize = d.history.windowSize * 2
+	if !d.o.lowMem || d.history.windowSize < maxBlockSize {
+		// Alloc 2x window size if not low-mem, or very small window size.
+		d.history.allocFrameBuffer = d.history.windowSize * 2
 	} else {
-		d.history.maxSize = d.history.windowSize + maxBlockSize
+		// Alloc with one additional block
+		d.history.allocFrameBuffer = d.history.windowSize + maxBlockSize
 	}
+
+	if debugDecoder {
+		println("Frame: Dict:", d.DictionaryID, "FrameContentSize:", d.FrameContentSize, "singleseg:", d.SingleSegment, "window:", d.WindowSize, "crc:", d.HasCheckSum)
+	}
+
 	// history contains input - maybe we do something
 	d.rawInput = br
 	return nil
@@ -248,56 +279,37 @@
 
 // next will start decoding the next block from stream.
 func (d *frameDec) next(block *blockDec) error {
-	if debug {
-		printf("decoding new block %p:%p", block, block.data)
+	if debugDecoder {
+		println("decoding new block")
 	}
 	err := block.reset(d.rawInput, d.WindowSize)
 	if err != nil {
 		println("block error:", err)
 		// Signal the frame decoder we have a problem.
-		d.sendErr(block, err)
+		block.sendErr(err)
 		return err
 	}
-	block.input <- struct{}{}
-	if debug {
-		println("next block:", block)
-	}
-	d.asyncRunningMu.Lock()
-	defer d.asyncRunningMu.Unlock()
-	if !d.asyncRunning {
-		return nil
-	}
-	if block.Last {
-		// We indicate the frame is done by sending io.EOF
-		d.decoding <- block
-		return io.EOF
-	}
-	d.decoding <- block
 	return nil
 }
 
-// sendEOF will queue an error block on the frame.
-// This will cause the frame decoder to return when it encounters the block.
-// Returns true if the decoder was added.
-func (d *frameDec) sendErr(block *blockDec, err error) bool {
-	d.asyncRunningMu.Lock()
-	defer d.asyncRunningMu.Unlock()
-	if !d.asyncRunning {
-		return false
-	}
-
-	println("sending error", err.Error())
-	block.sendErr(err)
-	d.decoding <- block
-	return true
-}
-
 // checkCRC will check the checksum if the frame has one.
 // Will return ErrCRCMismatch if crc check failed, otherwise nil.
 func (d *frameDec) checkCRC() error {
 	if !d.HasCheckSum {
 		return nil
 	}
+
+	// We can overwrite upper tmp now
+	want, err := d.rawInput.readSmall(4)
+	if err != nil {
+		println("CRC missing?", err)
+		return err
+	}
+
+	if d.o.ignoreChecksum {
+		return nil
+	}
+
 	var tmp [4]byte
 	got := d.crc.Sum64()
 	// Flip to match file order.
@@ -306,142 +318,29 @@
 	tmp[2] = byte(got >> 16)
 	tmp[3] = byte(got >> 24)
 
-	// We can overwrite upper tmp now
-	want := d.rawInput.readSmall(4)
-	if want == nil {
-		println("CRC missing?")
-		return io.ErrUnexpectedEOF
-	}
-
 	if !bytes.Equal(tmp[:], want) {
-		if debug {
+		if debugDecoder {
 			println("CRC Check Failed:", tmp[:], "!=", want)
 		}
 		return ErrCRCMismatch
 	}
-	if debug {
+	if debugDecoder {
 		println("CRC ok", tmp[:])
 	}
 	return nil
 }
 
-func (d *frameDec) initAsync() {
-	if !d.o.lowMem && !d.SingleSegment {
-		// set max extra size history to 10MB.
-		d.history.maxSize = d.history.windowSize + maxBlockSize*5
+// consumeCRC reads the checksum data if the frame has one.
+func (d *frameDec) consumeCRC() error {
+	if d.HasCheckSum {
+		_, err := d.rawInput.readSmall(4)
+		if err != nil {
+			println("CRC missing?", err)
+			return err
+		}
 	}
-	// re-alloc if more than one extra block size.
-	if d.o.lowMem && cap(d.history.b) > d.history.maxSize+maxBlockSize {
-		d.history.b = make([]byte, 0, d.history.maxSize)
-	}
-	if cap(d.history.b) < d.history.maxSize {
-		d.history.b = make([]byte, 0, d.history.maxSize)
-	}
-	if cap(d.decoding) < d.o.concurrent {
-		d.decoding = make(chan *blockDec, d.o.concurrent)
-	}
-	if debug {
-		h := d.history
-		printf("history init. len: %d, cap: %d", len(h.b), cap(h.b))
-	}
-	d.asyncRunningMu.Lock()
-	d.asyncRunning = true
-	d.asyncRunningMu.Unlock()
-}
 
-// startDecoder will start decoding blocks and write them to the writer.
-// The decoder will stop as soon as an error occurs or at end of frame.
-// When the frame has finished decoding the *bufio.Reader
-// containing the remaining input will be sent on frameDec.frameDone.
-func (d *frameDec) startDecoder(output chan decodeOutput) {
-	written := int64(0)
-
-	defer func() {
-		d.asyncRunningMu.Lock()
-		d.asyncRunning = false
-		d.asyncRunningMu.Unlock()
-
-		// Drain the currently decoding.
-		d.history.error = true
-	flushdone:
-		for {
-			select {
-			case b := <-d.decoding:
-				b.history <- &d.history
-				output <- <-b.result
-			default:
-				break flushdone
-			}
-		}
-		println("frame decoder done, signalling done")
-		d.frameDone.Done()
-	}()
-	// Get decoder for first block.
-	block := <-d.decoding
-	block.history <- &d.history
-	for {
-		var next *blockDec
-		// Get result
-		r := <-block.result
-		if r.err != nil {
-			println("Result contained error", r.err)
-			output <- r
-			return
-		}
-		if debug {
-			println("got result, from ", d.offset, "to", d.offset+int64(len(r.b)))
-			d.offset += int64(len(r.b))
-		}
-		if !block.Last {
-			// Send history to next block
-			select {
-			case next = <-d.decoding:
-				if debug {
-					println("Sending ", len(d.history.b), "bytes as history")
-				}
-				next.history <- &d.history
-			default:
-				// Wait until we have sent the block, so
-				// other decoders can potentially get the decoder.
-				next = nil
-			}
-		}
-
-		// Add checksum, async to decoding.
-		if d.HasCheckSum {
-			n, err := d.crc.Write(r.b)
-			if err != nil {
-				r.err = err
-				if n != len(r.b) {
-					r.err = io.ErrShortWrite
-				}
-				output <- r
-				return
-			}
-		}
-		written += int64(len(r.b))
-		if d.SingleSegment && uint64(written) > d.FrameContentSize {
-			println("runDecoder: single segment and", uint64(written), ">", d.FrameContentSize)
-			r.err = ErrFrameSizeExceeded
-			output <- r
-			return
-		}
-		if block.Last {
-			r.err = d.checkCRC()
-			output <- r
-			return
-		}
-		output <- r
-		if next == nil {
-			// There was no decoder available, we wait for one now that we have sent to the writer.
-			if debug {
-				println("Sending ", len(d.history.b), " bytes as history")
-			}
-			next = <-d.decoding
-			next.history <- &d.history
-		}
-		block = next
-	}
+	return nil
 }
 
 // runDecoder will create a sync decoder that will decode a block of data.
@@ -450,41 +349,67 @@
 
 	// We use the history for output to avoid copying it.
 	d.history.b = dst
+	d.history.ignoreBuffer = len(dst)
 	// Store input length, so we only check new data.
 	crcStart := len(dst)
+	d.history.decoders.maxSyncLen = 0
+	if d.FrameContentSize != fcsUnknown {
+		d.history.decoders.maxSyncLen = d.FrameContentSize + uint64(len(dst))
+		if d.history.decoders.maxSyncLen > d.o.maxDecodedSize {
+			return dst, ErrDecoderSizeExceeded
+		}
+		if uint64(cap(dst)) < d.history.decoders.maxSyncLen {
+			// Alloc for output
+			dst2 := make([]byte, len(dst), d.history.decoders.maxSyncLen+compressedBlockOverAlloc)
+			copy(dst2, dst)
+			dst = dst2
+		}
+	}
 	var err error
 	for {
 		err = dec.reset(d.rawInput, d.WindowSize)
 		if err != nil {
 			break
 		}
-		if debug {
+		if debugDecoder {
 			println("next block:", dec)
 		}
 		err = dec.decodeBuf(&d.history)
-		if err != nil || dec.Last {
+		if err != nil {
 			break
 		}
 		if uint64(len(d.history.b)) > d.o.maxDecodedSize {
 			err = ErrDecoderSizeExceeded
 			break
 		}
-		if d.SingleSegment && uint64(len(d.history.b)) > d.o.maxDecodedSize {
-			println("runDecoder: single segment and", uint64(len(d.history.b)), ">", d.o.maxDecodedSize)
+		if uint64(len(d.history.b)-crcStart) > d.FrameContentSize {
+			println("runDecoder: FrameContentSize exceeded", uint64(len(d.history.b)-crcStart), ">", d.FrameContentSize)
 			err = ErrFrameSizeExceeded
 			break
 		}
+		if dec.Last {
+			break
+		}
+		if debugDecoder {
+			println("runDecoder: FrameContentSize", uint64(len(d.history.b)-crcStart), "<=", d.FrameContentSize)
+		}
 	}
 	dst = d.history.b
 	if err == nil {
-		if d.HasCheckSum {
-			var n int
-			n, err = d.crc.Write(dst[crcStart:])
-			if err == nil {
-				if n != len(dst)-crcStart {
-					err = io.ErrShortWrite
-				} else {
-					err = d.checkCRC()
+		if d.FrameContentSize != fcsUnknown && uint64(len(d.history.b)-crcStart) != d.FrameContentSize {
+			err = ErrFrameSizeMismatch
+		} else if d.HasCheckSum {
+			if d.o.ignoreChecksum {
+				err = d.consumeCRC()
+			} else {
+				var n int
+				n, err = d.crc.Write(dst[crcStart:])
+				if err == nil {
+					if n != len(dst)-crcStart {
+						err = io.ErrShortWrite
+					} else {
+						err = d.checkCRC()
+					}
 				}
 			}
 		}