[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/pierrec/lz4/writer.go b/vendor/github.com/pierrec/lz4/writer.go
index 2cc8d95..6a60a9a 100644
--- a/vendor/github.com/pierrec/lz4/writer.go
+++ b/vendor/github.com/pierrec/lz4/writer.go
@@ -4,10 +4,18 @@
 	"encoding/binary"
 	"fmt"
 	"io"
+	"runtime"
 
 	"github.com/pierrec/lz4/internal/xxh32"
 )
 
+// zResult contains the results of compressing a block.
+type zResult struct {
+	size     uint32 // Block header
+	data     []byte // Compressed data
+	checksum uint32 // Data checksum
+}
+
 // Writer implements the LZ4 frame encoder.
 type Writer struct {
 	Header
@@ -18,10 +26,13 @@
 	buf       [19]byte      // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
 	dst       io.Writer     // Destination.
 	checksum  xxh32.XXHZero // Frame checksum.
-	zdata     []byte        // Compressed data.
-	data      []byte        // Data to be compressed.
+	data      []byte        // Data to be compressed + buffer for compressed data.
 	idx       int           // Index into data.
 	hashtable [winSize]int  // Hash table used in CompressBlock().
+
+	// For concurrency.
+	c   chan chan zResult // Channel for block compression goroutines and writer goroutine.
+	err error             // Any error encountered while writing to the underlying destination.
 }
 
 // NewWriter returns a new LZ4 frame encoder.
@@ -29,30 +40,92 @@
 // The supplied Header is checked at the first Write.
 // It is ok to change it before the first Write but then not until a Reset() is performed.
 func NewWriter(dst io.Writer) *Writer {
-	return &Writer{dst: dst}
+	z := new(Writer)
+	z.Reset(dst)
+	return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+	switch {
+	case n == 0 || n == 1:
+		z.c = nil
+		return z
+	case n < 0:
+		n = runtime.GOMAXPROCS(0)
+	}
+	z.c = make(chan chan zResult, n)
+	// Writer goroutine managing concurrent block compression goroutines.
+	go func() {
+		// Process next block compression item.
+		for c := range z.c {
+			// Read the next compressed block result.
+			// Waiting here ensures that the blocks are output in the order they were sent.
+			// The incoming channel is always closed as it indicates to the caller that
+			// the block has been processed.
+			res := <-c
+			n := len(res.data)
+			if n == 0 {
+				// Notify the block compression routine that we are done with its result.
+				// This is used when a sentinel block is sent to terminate the compression.
+				close(c)
+				return
+			}
+			// Write the block.
+			if err := z.writeUint32(res.size); err != nil && z.err == nil {
+				z.err = err
+			}
+			if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+				z.err = err
+			}
+			if z.BlockChecksum {
+				if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+					z.err = err
+				}
+			}
+			if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
+				// It is now safe to release the buffer as no longer in use by any goroutine.
+				putBuffer(cap(res.data), res.data)
+			}
+			if h := z.OnBlockDone; h != nil {
+				h(n)
+			}
+			close(c)
+		}
+	}()
+	return z
+}
+
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+	bSize := z.Header.BlockMaxSize
+	buf := getBuffer(bSize)
+	z.data = buf[:bSize] // Uncompressed buffer is the first half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+	// Put the buffer back into the pool, if any.
+	putBuffer(z.Header.BlockMaxSize, z.data)
+	z.data = nil
 }
 
 // writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
 func (z *Writer) writeHeader() error {
 	// Default to 4Mb if BlockMaxSize is not set.
 	if z.Header.BlockMaxSize == 0 {
-		z.Header.BlockMaxSize = bsMapID[7]
+		z.Header.BlockMaxSize = blockSize4M
 	}
 	// The only option that needs to be validated.
 	bSize := z.Header.BlockMaxSize
-	bSizeID, ok := bsMapValue[bSize]
-	if !ok {
+	if !isValidBlockSize(z.Header.BlockMaxSize) {
 		return fmt.Errorf("lz4: invalid block max size: %d", bSize)
 	}
 	// Allocate the compressed/uncompressed buffers.
 	// The compressed buffer cannot exceed the uncompressed one.
-	if cap(z.zdata) < bSize {
-		// Only allocate if there is not enough capacity.
-		// Allocate both buffers at once.
-		z.zdata = make([]byte, 2*bSize)
-	}
-	z.data = z.zdata[:bSize]                 // Uncompressed buffer is the first half.
-	z.zdata = z.zdata[:cap(z.zdata)][bSize:] // Compressed buffer is the second half.
+	z.newBuffers()
 	z.idx = 0
 
 	// Size is optional.
@@ -72,7 +145,7 @@
 		flg |= 1 << 2
 	}
 	buf[4] = flg
-	buf[5] = bSizeID << 4
+	buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
 
 	// Current buffer size: magic(4) + flags(1) + block max size (1).
 	n := 6
@@ -152,28 +225,34 @@
 // compressBlock compresses a block.
 func (z *Writer) compressBlock(data []byte) error {
 	if !z.NoChecksum {
-		z.checksum.Write(data)
+		_, _ = z.checksum.Write(data)
 	}
 
+	if z.c != nil {
+		c := make(chan zResult)
+		z.c <- c // Send now to guarantee order
+		go writerCompressBlock(c, z.Header, data)
+		return nil
+	}
+
+	zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
 	// The compressed block size cannot exceed the input's.
 	var zn int
-	var err error
 
 	if level := z.Header.CompressionLevel; level != 0 {
-		zn, err = CompressBlockHC(data, z.zdata, level)
+		zn, _ = CompressBlockHC(data, zdata, level)
 	} else {
-		zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
+		zn, _ = CompressBlock(data, zdata, z.hashtable[:])
 	}
 
-	var zdata []byte
 	var bLen uint32
 	if debugFlag {
 		debug("block compression %d => %d", len(data), zn)
 	}
-	if err == nil && zn > 0 && zn < len(data) {
+	if zn > 0 && zn < len(data) {
 		// Compressible and compressed size smaller than uncompressed: ok!
 		bLen = uint32(zn)
-		zdata = z.zdata[:zn]
+		zdata = zdata[:zn]
 	} else {
 		// Uncompressed block.
 		bLen = uint32(len(data)) | compressedBlockFlag
@@ -220,13 +299,35 @@
 		return nil
 	}
 
-	if err := z.compressBlock(z.data[:z.idx]); err != nil {
-		return err
-	}
+	data := z.data[:z.idx]
 	z.idx = 0
+	if z.c == nil {
+		return z.compressBlock(data)
+	}
+	if !z.NoChecksum {
+		_, _ = z.checksum.Write(data)
+	}
+	c := make(chan zResult)
+	z.c <- c
+	writerCompressBlock(c, z.Header, data)
 	return nil
 }
 
+func (z *Writer) close() error {
+	if z.c == nil {
+		return nil
+	}
+	// Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+	c := make(chan zResult)
+	z.c <- c
+	c <- zResult{}
+	// Wait for the main goroutine to complete.
+	<-c
+	// At this point the main goroutine has shut down or is about to return.
+	z.c = nil
+	return z.err
+}
+
 // Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
 func (z *Writer) Close() error {
 	if !z.Header.done {
@@ -237,6 +338,10 @@
 	if err := z.Flush(); err != nil {
 		return err
 	}
+	if err := z.close(); err != nil {
+		return err
+	}
+	z.freeBuffers()
 
 	if debugFlag {
 		debug("writing last empty block")
@@ -258,12 +363,19 @@
 // initial state from NewWriter, but instead writing to w.
 // No access to the underlying io.Writer is performed.
 func (z *Writer) Reset(w io.Writer) {
-	z.Header = Header{}
+	n := cap(z.c)
+	_ = z.close()
+	z.freeBuffers()
+	z.Header.Reset()
 	z.dst = w
 	z.checksum.Reset()
-	z.zdata = z.zdata[:0]
-	z.data = z.data[:0]
 	z.idx = 0
+	z.err = nil
+	// reset hashtable to ensure deterministic output.
+	for i := range z.hashtable {
+		z.hashtable[i] = 0
+	}
+	z.WithConcurrency(n)
 }
 
 // writeUint32 writes a uint32 to the underlying writer.
@@ -273,3 +385,29 @@
 	_, err := z.dst.Write(buf)
 	return err
 }
+
+// writerCompressBlock compresses data into a pooled buffer and writes its result
+// out to the input channel.
+func writerCompressBlock(c chan zResult, header Header, data []byte) {
+	zdata := getBuffer(header.BlockMaxSize)
+	// The compressed block size cannot exceed the input's.
+	var zn int
+	if level := header.CompressionLevel; level != 0 {
+		zn, _ = CompressBlockHC(data, zdata, level)
+	} else {
+		var hashTable [winSize]int
+		zn, _ = CompressBlock(data, zdata, hashTable[:])
+	}
+	var res zResult
+	if zn > 0 && zn < len(data) {
+		res.size = uint32(zn)
+		res.data = zdata[:zn]
+	} else {
+		res.size = uint32(len(data)) | compressedBlockFlag
+		res.data = data
+	}
+	if header.BlockChecksum {
+		res.checksum = xxh32.ChecksumZero(res.data)
+	}
+	c <- res
+}