[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/vendor/github.com/pierrec/lz4/writer.go b/vendor/github.com/pierrec/lz4/writer.go
index 2cc8d95..6a60a9a 100644
--- a/vendor/github.com/pierrec/lz4/writer.go
+++ b/vendor/github.com/pierrec/lz4/writer.go
@@ -4,10 +4,18 @@
"encoding/binary"
"fmt"
"io"
+ "runtime"
"github.com/pierrec/lz4/internal/xxh32"
)
+// zResult contains the results of compressing a block.
+type zResult struct {
+ size uint32 // Block header
+ data []byte // Compressed data
+ checksum uint32 // Data checksum
+}
+
// Writer implements the LZ4 frame encoder.
type Writer struct {
Header
@@ -18,10 +26,13 @@
buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
dst io.Writer // Destination.
checksum xxh32.XXHZero // Frame checksum.
- zdata []byte // Compressed data.
- data []byte // Data to be compressed.
+ data []byte // Data to be compressed + buffer for compressed data.
idx int // Index into data.
hashtable [winSize]int // Hash table used in CompressBlock().
+
+ // For concurrency.
+ c chan chan zResult // Channel for block compression goroutines and writer goroutine.
+ err error // Any error encountered while writing to the underlying destination.
}
// NewWriter returns a new LZ4 frame encoder.
@@ -29,30 +40,92 @@
// The supplied Header is checked at the first Write.
// It is ok to change it before the first Write but then not until a Reset() is performed.
func NewWriter(dst io.Writer) *Writer {
- return &Writer{dst: dst}
+ z := new(Writer)
+ z.Reset(dst)
+ return z
+}
+
+// WithConcurrency sets the number of concurrent go routines used for compression.
+// A negative value sets the concurrency to GOMAXPROCS.
+func (z *Writer) WithConcurrency(n int) *Writer {
+ switch {
+ case n == 0 || n == 1:
+ z.c = nil
+ return z
+ case n < 0:
+ n = runtime.GOMAXPROCS(0)
+ }
+ z.c = make(chan chan zResult, n)
+ // Writer goroutine managing concurrent block compression goroutines.
+ go func() {
+ // Process next block compression item.
+ for c := range z.c {
+ // Read the next compressed block result.
+ // Waiting here ensures that the blocks are output in the order they were sent.
+ // The incoming channel is always closed as it indicates to the caller that
+ // the block has been processed.
+ res := <-c
+ n := len(res.data)
+ if n == 0 {
+ // Notify the block compression routine that we are done with its result.
+ // This is used when a sentinel block is sent to terminate the compression.
+ close(c)
+ return
+ }
+ // Write the block.
+ if err := z.writeUint32(res.size); err != nil && z.err == nil {
+ z.err = err
+ }
+ if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
+ z.err = err
+ }
+ if z.BlockChecksum {
+ if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
+ z.err = err
+ }
+ }
+ if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
+ // It is now safe to release the buffer as no longer in use by any goroutine.
+ putBuffer(cap(res.data), res.data)
+ }
+ if h := z.OnBlockDone; h != nil {
+ h(n)
+ }
+ close(c)
+ }
+ }()
+ return z
+}
+
+// newBuffers instantiates new buffers which size matches the one in Header.
+// The returned buffers are for decompression and compression respectively.
+func (z *Writer) newBuffers() {
+ bSize := z.Header.BlockMaxSize
+ buf := getBuffer(bSize)
+ z.data = buf[:bSize] // Uncompressed buffer is the first half.
+}
+
+// freeBuffers puts the writer's buffers back to the pool.
+func (z *Writer) freeBuffers() {
+ // Put the buffer back into the pool, if any.
+ putBuffer(z.Header.BlockMaxSize, z.data)
+ z.data = nil
}
// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
func (z *Writer) writeHeader() error {
// Default to 4Mb if BlockMaxSize is not set.
if z.Header.BlockMaxSize == 0 {
- z.Header.BlockMaxSize = bsMapID[7]
+ z.Header.BlockMaxSize = blockSize4M
}
// The only option that needs to be validated.
bSize := z.Header.BlockMaxSize
- bSizeID, ok := bsMapValue[bSize]
- if !ok {
+ if !isValidBlockSize(z.Header.BlockMaxSize) {
return fmt.Errorf("lz4: invalid block max size: %d", bSize)
}
// Allocate the compressed/uncompressed buffers.
// The compressed buffer cannot exceed the uncompressed one.
- if cap(z.zdata) < bSize {
- // Only allocate if there is not enough capacity.
- // Allocate both buffers at once.
- z.zdata = make([]byte, 2*bSize)
- }
- z.data = z.zdata[:bSize] // Uncompressed buffer is the first half.
- z.zdata = z.zdata[:cap(z.zdata)][bSize:] // Compressed buffer is the second half.
+ z.newBuffers()
z.idx = 0
// Size is optional.
@@ -72,7 +145,7 @@
flg |= 1 << 2
}
buf[4] = flg
- buf[5] = bSizeID << 4
+ buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
// Current buffer size: magic(4) + flags(1) + block max size (1).
n := 6
@@ -152,28 +225,34 @@
// compressBlock compresses a block.
func (z *Writer) compressBlock(data []byte) error {
if !z.NoChecksum {
- z.checksum.Write(data)
+ _, _ = z.checksum.Write(data)
}
+ if z.c != nil {
+ c := make(chan zResult)
+ z.c <- c // Send now to guarantee order
+ go writerCompressBlock(c, z.Header, data)
+ return nil
+ }
+
+ zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
// The compressed block size cannot exceed the input's.
var zn int
- var err error
if level := z.Header.CompressionLevel; level != 0 {
- zn, err = CompressBlockHC(data, z.zdata, level)
+ zn, _ = CompressBlockHC(data, zdata, level)
} else {
- zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
+ zn, _ = CompressBlock(data, zdata, z.hashtable[:])
}
- var zdata []byte
var bLen uint32
if debugFlag {
debug("block compression %d => %d", len(data), zn)
}
- if err == nil && zn > 0 && zn < len(data) {
+ if zn > 0 && zn < len(data) {
// Compressible and compressed size smaller than uncompressed: ok!
bLen = uint32(zn)
- zdata = z.zdata[:zn]
+ zdata = zdata[:zn]
} else {
// Uncompressed block.
bLen = uint32(len(data)) | compressedBlockFlag
@@ -220,13 +299,35 @@
return nil
}
- if err := z.compressBlock(z.data[:z.idx]); err != nil {
- return err
- }
+ data := z.data[:z.idx]
z.idx = 0
+ if z.c == nil {
+ return z.compressBlock(data)
+ }
+ if !z.NoChecksum {
+ _, _ = z.checksum.Write(data)
+ }
+ c := make(chan zResult)
+ z.c <- c
+ writerCompressBlock(c, z.Header, data)
return nil
}
+func (z *Writer) close() error {
+ if z.c == nil {
+ return nil
+ }
+ // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
+ c := make(chan zResult)
+ z.c <- c
+ c <- zResult{}
+ // Wait for the main goroutine to complete.
+ <-c
+ // At this point the main goroutine has shut down or is about to return.
+ z.c = nil
+ return z.err
+}
+
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
func (z *Writer) Close() error {
if !z.Header.done {
@@ -237,6 +338,10 @@
if err := z.Flush(); err != nil {
return err
}
+ if err := z.close(); err != nil {
+ return err
+ }
+ z.freeBuffers()
if debugFlag {
debug("writing last empty block")
@@ -258,12 +363,19 @@
// initial state from NewWriter, but instead writing to w.
// No access to the underlying io.Writer is performed.
func (z *Writer) Reset(w io.Writer) {
- z.Header = Header{}
+ n := cap(z.c)
+ _ = z.close()
+ z.freeBuffers()
+ z.Header.Reset()
z.dst = w
z.checksum.Reset()
- z.zdata = z.zdata[:0]
- z.data = z.data[:0]
z.idx = 0
+ z.err = nil
+ // reset hashtable to ensure deterministic output.
+ for i := range z.hashtable {
+ z.hashtable[i] = 0
+ }
+ z.WithConcurrency(n)
}
// writeUint32 writes a uint32 to the underlying writer.
@@ -273,3 +385,29 @@
_, err := z.dst.Write(buf)
return err
}
+
+// writerCompressBlock compresses data into a pooled buffer and writes its result
+// out to the input channel.
+func writerCompressBlock(c chan zResult, header Header, data []byte) {
+ zdata := getBuffer(header.BlockMaxSize)
+ // The compressed block size cannot exceed the input's.
+ var zn int
+ if level := header.CompressionLevel; level != 0 {
+ zn, _ = CompressBlockHC(data, zdata, level)
+ } else {
+ var hashTable [winSize]int
+ zn, _ = CompressBlock(data, zdata, hashTable[:])
+ }
+ var res zResult
+ if zn > 0 && zn < len(data) {
+ res.size = uint32(zn)
+ res.data = zdata[:zn]
+ } else {
+ res.size = uint32(len(data)) | compressedBlockFlag
+ res.data = data
+ }
+ if header.BlockChecksum {
+ res.checksum = xxh32.ChecksumZero(res.data)
+ }
+ c <- res
+}