[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/pierrec/lz4/README.md b/vendor/github.com/pierrec/lz4/README.md
index be1f52a..4ee388e 100644
--- a/vendor/github.com/pierrec/lz4/README.md
+++ b/vendor/github.com/pierrec/lz4/README.md
@@ -83,24 +83,8 @@
 
 ## Contributors
 
-Thanks to all contributors so far:
+Thanks to all [contributors](https://github.com/pierrec/lz4/graphs/contributors)  so far!
 
-- [@klauspost](https://github.com/klauspost)
-- [@heidawei](https://github.com/heidawei)
-- [@x4m](https://github.com/x4m)
-- [@Zariel](https://github.com/Zariel)
-- [@edwingeng](https://github.com/edwingeng)
-- [@danielmoy-google](https://github.com/danielmoy-google)
-- [@honda-tatsuya](https://github.com/honda-tatsuya)
-- [@h8liu](https://github.com/h8liu)
-- [@sbinet](https://github.com/sbinet)
-- [@fingon](https://github.com/fingon)
-- [@emfree](https://github.com/emfree)
-- [@lhemala](https://github.com/lhemala)
-- [@connor4312](https://github.com/connor4312)
-- [@oov](https://github.com/oov)
-- [@arya](https://github.com/arya)
-- [@ikkeps](https://github.com/ikkeps)
+Special thanks to [@Zariel](https://github.com/Zariel) for his asm implementation of the decoder.
 
-Special thanks to [@Zariel](https://github.com/Zariel) for his asm implementation of the decoder
-Special thanks to [@klauspost](https://github.com/klauspost) for his work on optimizing the code
+Special thanks to [@klauspost](https://github.com/klauspost) for his work on optimizing the code.
diff --git a/vendor/github.com/pierrec/lz4/block.go b/vendor/github.com/pierrec/lz4/block.go
index 5755cda..664d9be 100644
--- a/vendor/github.com/pierrec/lz4/block.go
+++ b/vendor/github.com/pierrec/lz4/block.go
@@ -2,8 +2,8 @@
 
 import (
 	"encoding/binary"
-	"fmt"
 	"math/bits"
+	"sync"
 )
 
 // blockHash hashes the lower 6 bytes into a value < htSize.
@@ -35,24 +35,31 @@
 
 // CompressBlock compresses the source buffer into the destination one.
 // This is the fast version of LZ4 compression and also the default one.
-// The size of hashTable must be at least 64Kb.
 //
-// The size of the compressed data is returned. If it is 0 and no error, then the data is incompressible.
+// The argument hashTable is scratch space for a hash table used by the
+// compressor. If provided, it should have length at least 1<<16. If it is
+// shorter (or nil), CompressBlock allocates its own hash table.
+//
+// The size of the compressed data is returned.
+//
+// If the destination buffer size is lower than CompressBlockBound and
+// the compressed size is 0 and no error, then the data is incompressible.
 //
 // An error is returned if the destination buffer is too small.
-func CompressBlock(src, dst []byte, hashTable []int) (di int, err error) {
+func CompressBlock(src, dst []byte, hashTable []int) (_ int, err error) {
 	defer recoverBlock(&err)
 
+	// Return 0, nil only if the destination buffer size is < CompressBlockBound.
+	isNotCompressible := len(dst) < CompressBlockBound(len(src))
+
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
-	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
+	// This significantly speeds up incompressible data and usually has very small impact on compression.
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
 	const adaptSkipLog = 7
-	sn, dn := len(src)-mfLimit, len(dst)
-	if sn <= 0 || dn == 0 {
-		return 0, nil
-	}
 	if len(hashTable) < htSize {
-		return 0, fmt.Errorf("hash table too small, should be at least %d in size", htSize)
+		htIface := htPool.Get()
+		defer htPool.Put(htIface)
+		hashTable = (*(htIface).(*[htSize]int))[:]
 	}
 	// Prove to the compiler the table has at least htSize elements.
 	// The compiler can see that "uint32() >> hashShift" cannot be out of bounds.
@@ -60,7 +67,11 @@
 
 	// si: Current position of the search.
 	// anchor: Position of the current literals.
-	var si, anchor int
+	var si, di, anchor int
+	sn := len(src) - mfLimit
+	if sn <= 0 {
+		goto lastLiterals
+	}
 
 	// Fast scan strategy: the hash table only stores the last 4 bytes sequences.
 	for si < sn {
@@ -124,7 +135,7 @@
 		si, mLen = si+mLen, si+minMatch
 
 		// Find the longest match by looking by batches of 8 bytes.
-		for si < sn {
+		for si+8 < sn {
 			x := binary.LittleEndian.Uint64(src[si:]) ^ binary.LittleEndian.Uint64(src[si-offset:])
 			if x == 0 {
 				si += 8
@@ -184,7 +195,8 @@
 		hashTable[h] = si - 2
 	}
 
-	if anchor == 0 {
+lastLiterals:
+	if isNotCompressible && anchor == 0 {
 		// Incompressible.
 		return 0, nil
 	}
@@ -205,7 +217,7 @@
 	di++
 
 	// Write the last literals.
-	if di >= anchor {
+	if isNotCompressible && di >= anchor {
 		// Incompressible.
 		return 0, nil
 	}
@@ -213,6 +225,13 @@
 	return di, nil
 }
 
+// Pool of hash tables for CompressBlock.
+var htPool = sync.Pool{
+	New: func() interface{} {
+		return new([htSize]int)
+	},
+}
+
 // blockHash hashes 4 bytes into a value < winSize.
 func blockHashHC(x uint32) uint32 {
 	const hasher uint32 = 2654435761 // Knuth multiplicative hash.
@@ -224,22 +243,24 @@
 //
 // CompressBlockHC compression ratio is better than CompressBlock but it is also slower.
 //
-// The size of the compressed data is returned. If it is 0 and no error, then the data is not compressible.
+// The size of the compressed data is returned.
+//
+// If the destination buffer size is lower than CompressBlockBound and
+// the compressed size is 0 and no error, then the data is incompressible.
 //
 // An error is returned if the destination buffer is too small.
-func CompressBlockHC(src, dst []byte, depth int) (di int, err error) {
+func CompressBlockHC(src, dst []byte, depth int) (_ int, err error) {
 	defer recoverBlock(&err)
 
+	// Return 0, nil only if the destination buffer size is < CompressBlockBound.
+	isNotCompressible := len(dst) < CompressBlockBound(len(src))
+
 	// adaptSkipLog sets how quickly the compressor begins skipping blocks when data is incompressible.
-	// This significantly speeds up incompressible data and usually has very small impact on compresssion.
+	// This significantly speeds up incompressible data and usually has very small impact on compression.
 	// bytes to skip =  1 + (bytes since last match >> adaptSkipLog)
 	const adaptSkipLog = 7
 
-	sn, dn := len(src)-mfLimit, len(dst)
-	if sn <= 0 || dn == 0 {
-		return 0, nil
-	}
-	var si int
+	var si, di, anchor int
 
 	// hashTable: stores the last position found for a given hash
 	// chainTable: stores previous positions for a given hash
@@ -249,7 +270,11 @@
 		depth = winSize
 	}
 
-	anchor := si
+	sn := len(src) - mfLimit
+	if sn <= 0 {
+		goto lastLiterals
+	}
+
 	for si < sn {
 		// Hash the next 4 bytes (sequence).
 		match := binary.LittleEndian.Uint32(src[si:])
@@ -356,12 +381,13 @@
 		}
 	}
 
-	if anchor == 0 {
+	if isNotCompressible && anchor == 0 {
 		// Incompressible.
 		return 0, nil
 	}
 
 	// Last literals.
+lastLiterals:
 	lLen := len(src) - anchor
 	if lLen < 0xF {
 		dst[di] = byte(lLen << 4)
@@ -378,7 +404,7 @@
 	di++
 
 	// Write the last literals.
-	if di >= anchor {
+	if isNotCompressible && di >= anchor {
 		// Incompressible.
 		return 0, nil
 	}
diff --git a/vendor/github.com/pierrec/lz4/lz4.go b/vendor/github.com/pierrec/lz4/lz4.go
index cdbf961..a3284bd 100644
--- a/vendor/github.com/pierrec/lz4/lz4.go
+++ b/vendor/github.com/pierrec/lz4/lz4.go
@@ -10,14 +10,20 @@
 //
 package lz4
 
+import (
+	"math/bits"
+	"sync"
+)
+
 const (
 	// Extension is the LZ4 frame file name extension
 	Extension = ".lz4"
 	// Version is the LZ4 frame format version
 	Version = 1
 
-	frameMagic     uint32 = 0x184D2204
-	frameSkipMagic uint32 = 0x184D2A50
+	frameMagic       uint32 = 0x184D2204
+	frameSkipMagic   uint32 = 0x184D2A50
+	frameMagicLegacy uint32 = 0x184C2102
 
 	// The following constants are used to setup the compression algorithm.
 	minMatch            = 4  // the minimum size of the match sequence size (4 bytes)
@@ -34,28 +40,67 @@
 	hashLog = 16
 	htSize  = 1 << hashLog
 
-	mfLimit = 8 + minMatch // The last match cannot start within the last 12 bytes.
+	mfLimit = 10 + minMatch // The last match cannot start within the last 14 bytes.
 )
 
 // map the block max size id with its value in bytes: 64Kb, 256Kb, 1Mb and 4Mb.
 const (
-	blockSize64K  = 64 << 10
-	blockSize256K = 256 << 10
-	blockSize1M   = 1 << 20
-	blockSize4M   = 4 << 20
+	blockSize64K = 1 << (16 + 2*iota)
+	blockSize256K
+	blockSize1M
+	blockSize4M
 )
 
 var (
-	bsMapID    = map[byte]int{4: blockSize64K, 5: blockSize256K, 6: blockSize1M, 7: blockSize4M}
-	bsMapValue = map[int]byte{blockSize64K: 4, blockSize256K: 5, blockSize1M: 6, blockSize4M: 7}
+	// Keep a pool of buffers for each valid block sizes.
+	bsMapValue = [...]*sync.Pool{
+		newBufferPool(2 * blockSize64K),
+		newBufferPool(2 * blockSize256K),
+		newBufferPool(2 * blockSize1M),
+		newBufferPool(2 * blockSize4M),
+	}
 )
 
+// newBufferPool returns a pool for buffers of the given size.
+func newBufferPool(size int) *sync.Pool {
+	return &sync.Pool{
+		New: func() interface{} {
+			return make([]byte, size)
+		},
+	}
+}
+
+// getBuffer returns a buffer to its pool.
+func getBuffer(size int) []byte {
+	idx := blockSizeValueToIndex(size) - 4
+	return bsMapValue[idx].Get().([]byte)
+}
+
+// putBuffer returns a buffer to its pool.
+func putBuffer(size int, buf []byte) {
+	if cap(buf) > 0 {
+		idx := blockSizeValueToIndex(size) - 4
+		bsMapValue[idx].Put(buf[:cap(buf)])
+	}
+}
+func blockSizeIndexToValue(i byte) int {
+	return 1 << (16 + 2*uint(i))
+}
+func isValidBlockSize(size int) bool {
+	const blockSizeMask = blockSize64K | blockSize256K | blockSize1M | blockSize4M
+
+	return size&blockSizeMask > 0 && bits.OnesCount(uint(size)) == 1
+}
+func blockSizeValueToIndex(size int) byte {
+	return 4 + byte(bits.TrailingZeros(uint(size)>>16)/2)
+}
+
 // Header describes the various flags that can be set on a Writer or obtained from a Reader.
 // The default values match those of the LZ4 frame format definition
 // (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).
 //
 // NB. in a Reader, in case of concatenated frames, the Header values may change between Read() calls.
-// It is the caller responsibility to check them if necessary.
+// It is the caller's responsibility to check them if necessary.
 type Header struct {
 	BlockChecksum    bool   // Compressed blocks checksum flag.
 	NoChecksum       bool   // Frame checksum flag.
@@ -64,3 +109,8 @@
 	CompressionLevel int    // Compression level (higher is better, use 0 for fastest compression).
 	done             bool   // Header processed flag (Read or Write and checked).
 }
+
+// Reset reset internal status
+func (h *Header) Reset() {
+	h.done = false
+}
diff --git a/vendor/github.com/pierrec/lz4/reader.go b/vendor/github.com/pierrec/lz4/reader.go
index 126b792..87dd72b 100644
--- a/vendor/github.com/pierrec/lz4/reader.go
+++ b/vendor/github.com/pierrec/lz4/reader.go
@@ -88,10 +88,10 @@
 	z.NoChecksum = b>>2&1 == 0
 
 	bmsID := buf[1] >> 4 & 0x7
-	bSize, ok := bsMapID[bmsID]
-	if !ok {
+	if bmsID < 4 || bmsID > 7 {
 		return fmt.Errorf("lz4: invalid block max size ID: %d", bmsID)
 	}
+	bSize := blockSizeIndexToValue(bmsID - 4)
 	z.BlockMaxSize = bSize
 
 	// Allocate the compressed/uncompressed buffers.
diff --git a/vendor/github.com/pierrec/lz4/reader_legacy.go b/vendor/github.com/pierrec/lz4/reader_legacy.go
new file mode 100644
index 0000000..1670a77
--- /dev/null
+++ b/vendor/github.com/pierrec/lz4/reader_legacy.go
@@ -0,0 +1,207 @@
+package lz4
+
+import (
+	"encoding/binary"
+	"fmt"
+	"io"
+)
+
+// ReaderLegacy implements the LZ4Demo frame decoder.
+// The Header is set after the first call to Read().
+type ReaderLegacy struct {
+	Header
+	// Handler called when a block has been successfully read.
+	// It provides the number of bytes read.
+	OnBlockDone func(size int)
+
+	lastBlock bool
+	buf       [8]byte   // Scrap buffer.
+	pos       int64     // Current position in src.
+	src       io.Reader // Source.
+	zdata     []byte    // Compressed data.
+	data      []byte    // Uncompressed data.
+	idx       int       // Index of unread bytes into data.
+	skip      int64     // Bytes to skip before next read.
+	dpos      int64     // Position in dest
+}
+
+// NewReaderLegacy returns a new LZ4Demo frame decoder.
+// No access to the underlying io.Reader is performed.
+func NewReaderLegacy(src io.Reader) *ReaderLegacy {
+	r := &ReaderLegacy{src: src}
+	return r
+}
+
+// readHeader checks the frame magic number and parses the frame descriptoz.
+// Skippable frames are supported even as a first frame although the LZ4
+// specifications recommends skippable frames not to be used as first frames.
+func (z *ReaderLegacy) readLegacyHeader() error {
+	z.lastBlock = false
+	magic, err := z.readUint32()
+	if err != nil {
+		z.pos += 4
+		if err == io.ErrUnexpectedEOF {
+			return io.EOF
+		}
+		return err
+	}
+	if magic != frameMagicLegacy {
+		return ErrInvalid
+	}
+	z.pos += 4
+
+	// Legacy has fixed 8MB blocksizes
+	// https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md#legacy-frame
+	bSize := blockSize4M * 2
+
+	// Allocate the compressed/uncompressed buffers.
+	// The compressed buffer cannot exceed the uncompressed one.
+	if n := 2 * bSize; cap(z.zdata) < n {
+		z.zdata = make([]byte, n, n)
+	}
+	if debugFlag {
+		debug("header block max size size=%d", bSize)
+	}
+	z.zdata = z.zdata[:bSize]
+	z.data = z.zdata[:cap(z.zdata)][bSize:]
+	z.idx = len(z.data)
+
+	z.Header.done = true
+	if debugFlag {
+		debug("header read: %v", z.Header)
+	}
+
+	return nil
+}
+
+// Read decompresses data from the underlying source into the supplied buffer.
+//
+// Since there can be multiple streams concatenated, Header values may
+// change between calls to Read(). If that is the case, no data is actually read from
+// the underlying io.Reader, to allow for potential input buffer resizing.
+func (z *ReaderLegacy) Read(buf []byte) (int, error) {
+	if debugFlag {
+		debug("Read buf len=%d", len(buf))
+	}
+	if !z.Header.done {
+		if err := z.readLegacyHeader(); err != nil {
+			return 0, err
+		}
+		if debugFlag {
+			debug("header read OK compressed buffer %d / %d uncompressed buffer %d : %d index=%d",
+				len(z.zdata), cap(z.zdata), len(z.data), cap(z.data), z.idx)
+		}
+	}
+
+	if len(buf) == 0 {
+		return 0, nil
+	}
+
+	if z.idx == len(z.data) {
+		// No data ready for reading, process the next block.
+		if debugFlag {
+			debug("  reading block from writer %d %d", z.idx, blockSize4M*2)
+		}
+
+		// Reset uncompressed buffer
+		z.data = z.zdata[:cap(z.zdata)][len(z.zdata):]
+
+		bLen, err := z.readUint32()
+		if err != nil {
+			return 0, err
+		}
+		if debugFlag {
+			debug("   bLen %d (0x%x) offset = %d (0x%x)", bLen, bLen, z.pos, z.pos)
+		}
+		z.pos += 4
+
+		// Legacy blocks are always compressed, even when detrimental
+		if debugFlag {
+			debug("   compressed block size %d", bLen)
+		}
+
+		if int(bLen) > cap(z.data) {
+			return 0, fmt.Errorf("lz4: invalid block size: %d", bLen)
+		}
+		zdata := z.zdata[:bLen]
+		if _, err := io.ReadFull(z.src, zdata); err != nil {
+			return 0, err
+		}
+		z.pos += int64(bLen)
+
+		n, err := UncompressBlock(zdata, z.data)
+		if err != nil {
+			return 0, err
+		}
+
+		z.data = z.data[:n]
+		if z.OnBlockDone != nil {
+			z.OnBlockDone(n)
+		}
+
+		z.idx = 0
+
+		// Legacy blocks are fixed to 8MB, if we read a decompressed block smaller than this
+		// it means we've reached the end...
+		if n < blockSize4M*2 {
+			z.lastBlock = true
+		}
+	}
+
+	if z.skip > int64(len(z.data[z.idx:])) {
+		z.skip -= int64(len(z.data[z.idx:]))
+		z.dpos += int64(len(z.data[z.idx:]))
+		z.idx = len(z.data)
+		return 0, nil
+	}
+
+	z.idx += int(z.skip)
+	z.dpos += z.skip
+	z.skip = 0
+
+	n := copy(buf, z.data[z.idx:])
+	z.idx += n
+	z.dpos += int64(n)
+	if debugFlag {
+		debug("%v] copied %d bytes to input (%d:%d)", z.lastBlock, n, z.idx, len(z.data))
+	}
+	if z.lastBlock && len(z.data) == z.idx {
+		return n, io.EOF
+	}
+	return n, nil
+}
+
+// Seek implements io.Seeker, but supports seeking forward from the current
+// position only. Any other seek will return an error. Allows skipping output
+// bytes which aren't needed, which in some scenarios is faster than reading
+// and discarding them.
+// Note this may cause future calls to Read() to read 0 bytes if all of the
+// data they would have returned is skipped.
+func (z *ReaderLegacy) Seek(offset int64, whence int) (int64, error) {
+	if offset < 0 || whence != io.SeekCurrent {
+		return z.dpos + z.skip, ErrUnsupportedSeek
+	}
+	z.skip += offset
+	return z.dpos + z.skip, nil
+}
+
+// Reset discards the Reader's state and makes it equivalent to the
+// result of its original state from NewReader, but reading from r instead.
+// This permits reusing a Reader rather than allocating a new one.
+func (z *ReaderLegacy) Reset(r io.Reader) {
+	z.Header = Header{}
+	z.pos = 0
+	z.src = r
+	z.zdata = z.zdata[:0]
+	z.data = z.data[:0]
+	z.idx = 0
+}
+
+// readUint32 reads an uint32 into the supplied buffer.
+// The idea is to make use of the already allocated buffers avoiding additional allocations.
+func (z *ReaderLegacy) readUint32() (uint32, error) {
+	buf := z.buf[:4]
+	_, err := io.ReadFull(z.src, buf)
+	x := binary.LittleEndian.Uint32(buf)
+	return x, err
+}
diff --git a/vendor/github.com/pierrec/lz4/writer.go b/vendor/github.com/pierrec/lz4/writer.go
index 2cc8d95..6a60a9a 100644
--- a/vendor/github.com/pierrec/lz4/writer.go
+++ b/vendor/github.com/pierrec/lz4/writer.go
@@ -4,10 +4,18 @@
 	"encoding/binary"
 	"fmt"
 	"io"
+	"runtime"
 
 	"github.com/pierrec/lz4/internal/xxh32"
 )
 
+// zResult contains the results of compressing a block.
+type zResult struct {
+	size     uint32 // Block header
+	data     []byte // Compressed data
+	checksum uint32 // Data checksum
+}
+
 // Writer implements the LZ4 frame encoder.
 type Writer struct {
 	Header
@@ -18,10 +26,13 @@
 	buf       [19]byte      // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
 	dst       io.Writer     // Destination.
 	checksum  xxh32.XXHZero // Frame checksum.
-	zdata     []byte        // Compressed data.
-	data      []byte        // Data to be compressed.
+	data      []byte        // Data to be compressed + buffer for compressed data.
 	idx       int           // Index into data.
 	hashtable [winSize]int  // Hash table used in CompressBlock().
+
+	// For concurrency.
+	c   chan chan zResult // Channel for block compression goroutines and writer goroutine.
+	err error             // Any error encountered while writing to the underlying destination.
 }
 
 // NewWriter returns a new LZ4 frame encoder.
@@ -29,30 +40,92 @@
 // The supplied Header is checked at the first Write.
 // It is ok to change it before the first Write but then not until a Reset() is performed.
 func NewWriter(dst io.Writer) *Writer {
-	return &Writer{dst: dst}
+	z := new(Writer)
+	z.Reset(dst)
+	return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+	switch {
+	case n == 0 || n == 1:
+		z.c = nil
+		return z
+	case n < 0:
+		n = runtime.GOMAXPROCS(0)
+	}
+	z.c = make(chan chan zResult, n)
+	// Writer goroutine managing concurrent block compression goroutines.
+	go func() {
+		// Process next block compression item.
+		for c := range z.c {
+			// Read the next compressed block result.
+			// Waiting here ensures that the blocks are output in the order they were sent.
+			// The incoming channel is always closed as it indicates to the caller that
+			// the block has been processed.
+			res := <-c
+			n := len(res.data)
+			if n == 0 {
+				// Notify the block compression routine that we are done with its result.
+				// This is used when a sentinel block is sent to terminate the compression.
+				close(c)
+				return
+			}
+			// Write the block.
+			if err := z.writeUint32(res.size); err != nil && z.err == nil {
+				z.err = err
+			}
+			if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+				z.err = err
+			}
+			if z.BlockChecksum {
+				if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+					z.err = err
+				}
+			}
+			if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
+				// It is now safe to release the buffer as no longer in use by any goroutine.
+				putBuffer(cap(res.data), res.data)
+			}
+			if h := z.OnBlockDone; h != nil {
+				h(n)
+			}
+			close(c)
+		}
+	}()
+	return z
+}
+
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+	bSize := z.Header.BlockMaxSize
+	buf := getBuffer(bSize)
+	z.data = buf[:bSize] // Uncompressed buffer is the first half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+	// Put the buffer back into the pool, if any.
+	putBuffer(z.Header.BlockMaxSize, z.data)
+	z.data = nil
 }
 
 // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
 func (z *Writer) writeHeader() error {
 	// Default to 4Mb if BlockMaxSize is not set.
 	if z.Header.BlockMaxSize == 0 {
-		z.Header.BlockMaxSize = bsMapID[7]
+		z.Header.BlockMaxSize = blockSize4M
 	}
 	// The only option that needs to be validated.
 	bSize := z.Header.BlockMaxSize
-	bSizeID, ok := bsMapValue[bSize]
-	if !ok {
+	if !isValidBlockSize(z.Header.BlockMaxSize) {
 		return fmt.Errorf("lz4: invalid block max size: %d", bSize)
 	}
 	// Allocate the compressed/uncompressed buffers.
 	// The compressed buffer cannot exceed the uncompressed one.
-	if cap(z.zdata) < bSize {
-		// Only allocate if there is not enough capacity.
-		// Allocate both buffers at once.
-		z.zdata = make([]byte, 2*bSize)
-	}
-	z.data = z.zdata[:bSize]                 // Uncompressed buffer is the first half.
-	z.zdata = z.zdata[:cap(z.zdata)][bSize:] // Compressed buffer is the second half.
+	z.newBuffers()
 	z.idx = 0
 
 	// Size is optional.
@@ -72,7 +145,7 @@
 		flg |= 1 << 2
 	}
 	buf[4] = flg
-	buf[5] = bSizeID << 4
+	buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
 
 	// Current buffer size: magic(4) + flags(1) + block max size (1).
 	n := 6
@@ -152,28 +225,34 @@
 // compressBlock compresses a block.
 func (z *Writer) compressBlock(data []byte) error {
 	if !z.NoChecksum {
-		z.checksum.Write(data)
+		_, _ = z.checksum.Write(data)
 	}
 
+	if z.c != nil {
+		c := make(chan zResult)
+		z.c <- c // Send now to guarantee order
+		go writerCompressBlock(c, z.Header, data)
+		return nil
+	}
+
+	zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
 	// The compressed block size cannot exceed the input's.
 	var zn int
-	var err error
 
 	if level := z.Header.CompressionLevel; level != 0 {
-		zn, err = CompressBlockHC(data, z.zdata, level)
+		zn, _ = CompressBlockHC(data, zdata, level)
 	} else {
-		zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
+		zn, _ = CompressBlock(data, zdata, z.hashtable[:])
 	}
 
-	var zdata []byte
 	var bLen uint32
 	if debugFlag {
 		debug("block compression %d => %d", len(data), zn)
 	}
-	if err == nil && zn > 0 && zn < len(data) {
+	if zn > 0 && zn < len(data) {
 		// Compressible and compressed size smaller than uncompressed: ok!
 		bLen = uint32(zn)
-		zdata = z.zdata[:zn]
+		zdata = zdata[:zn]
 	} else {
 		// Uncompressed block.
 		bLen = uint32(len(data)) | compressedBlockFlag
@@ -220,13 +299,35 @@
 		return nil
 	}
 
-	if err := z.compressBlock(z.data[:z.idx]); err != nil {
-		return err
-	}
+	data := z.data[:z.idx]
 	z.idx = 0
+	if z.c == nil {
+		return z.compressBlock(data)
+	}
+	if !z.NoChecksum {
+		_, _ = z.checksum.Write(data)
+	}
+	c := make(chan zResult)
+	z.c <- c
+	writerCompressBlock(c, z.Header, data)
 	return nil
 }
 
+func (z *Writer) close() error {
+	if z.c == nil {
+		return nil
+	}
+	// Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+	c := make(chan zResult)
+	z.c <- c
+	c <- zResult{}
+	// Wait for the main goroutine to complete.
+	<-c
+	// At this point the main goroutine has shut down or is about to return.
+	z.c = nil
+	return z.err
+}
+
 // Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
 func (z *Writer) Close() error {
 	if !z.Header.done {
@@ -237,6 +338,10 @@
 	if err := z.Flush(); err != nil {
 		return err
 	}
+	if err := z.close(); err != nil {
+		return err
+	}
+	z.freeBuffers()
 
 	if debugFlag {
 		debug("writing last empty block")
@@ -258,12 +363,19 @@
 // initial state from NewWriter, but instead writing to w.
 // No access to the underlying io.Writer is performed.
 func (z *Writer) Reset(w io.Writer) {
-	z.Header = Header{}
+	n := cap(z.c)
+	_ = z.close()
+	z.freeBuffers()
+	z.Header.Reset()
 	z.dst = w
 	z.checksum.Reset()
-	z.zdata = z.zdata[:0]
-	z.data = z.data[:0]
 	z.idx = 0
+	z.err = nil
+	// reset hashtable to ensure deterministic output.
+	for i := range z.hashtable {
+		z.hashtable[i] = 0
+	}
+	z.WithConcurrency(n)
 }
 
 // writeUint32 writes a uint32 to the underlying writer.
@@ -273,3 +385,29 @@
 	_, err := z.dst.Write(buf)
 	return err
 }
+
+// writerCompressBlock compresses data into a pooled buffer and writes its result
+// out to the input channel.
+func writerCompressBlock(c chan zResult, header Header, data []byte) {
+	zdata := getBuffer(header.BlockMaxSize)
+	// The compressed block size cannot exceed the input's.
+	var zn int
+	if level := header.CompressionLevel; level != 0 {
+		zn, _ = CompressBlockHC(data, zdata, level)
+	} else {
+		var hashTable [winSize]int
+		zn, _ = CompressBlock(data, zdata, hashTable[:])
+	}
+	var res zResult
+	if zn > 0 && zn < len(data) {
+		res.size = uint32(zn)
+		res.data = zdata[:zn]
+	} else {
+		res.size = uint32(len(data)) | compressedBlockFlag
+		res.data = data
+	}
+	if header.BlockChecksum {
+		res.checksum = xxh32.ChecksumZero(res.data)
+	}
+	c <- res
+}