gRPC migration update

Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/github.com/klauspost/compress/zstd/decoder.go b/vendor/github.com/klauspost/compress/zstd/decoder.go
index 35a3cda..f593e46 100644
--- a/vendor/github.com/klauspost/compress/zstd/decoder.go
+++ b/vendor/github.com/klauspost/compress/zstd/decoder.go
@@ -5,7 +5,6 @@
 package zstd
 
 import (
-	"bytes"
 	"errors"
 	"io"
 	"sync"
@@ -23,17 +22,15 @@
 	// Unreferenced decoders, ready for use.
 	decoders chan *blockDec
 
-	// Unreferenced decoders, ready for use.
-	frames chan *frameDec
-
 	// Streams ready to be decoded.
 	stream chan decodeStream
 
 	// Current read position used for Reader functionality.
 	current decoderState
 
-	// Custom dictionaries
-	dicts map[uint32]struct{}
+	// Custom dictionaries.
+	// Always uses copies.
+	dicts map[uint32]dict
 
 	// streamWg is the waitgroup for all streams
 	streamWg sync.WaitGroup
@@ -66,7 +63,7 @@
 // A Decoder can be used in two modes:
 //
 // 1) As a stream, or
-// 2) For stateless decoding using DecodeAll or DecodeBuffer.
+// 2) For stateless decoding using DecodeAll.
 //
 // Only a single stream can be decoded concurrently, but the same decoder
 // can run multiple concurrent stateless decodes. It is even possible to
@@ -87,12 +84,23 @@
 	d.current.output = make(chan decodeOutput, d.o.concurrent)
 	d.current.flushed = true
 
+	if r == nil {
+		d.current.err = ErrDecoderNilInput
+	}
+
+	// Transfer option dicts.
+	d.dicts = make(map[uint32]dict, len(d.o.dicts))
+	for _, dc := range d.o.dicts {
+		d.dicts[dc.id] = dc
+	}
+	d.o.dicts = nil
+
 	// Create decoders
 	d.decoders = make(chan *blockDec, d.o.concurrent)
-	d.frames = make(chan *frameDec, d.o.concurrent)
 	for i := 0; i < d.o.concurrent; i++ {
-		d.frames <- newFrameDec(d.o)
-		d.decoders <- newBlockDec(d.o.lowMem)
+		dec := newBlockDec(d.o.lowMem)
+		dec.localFrame = newFrameDec(d.o)
+		d.decoders <- dec
 	}
 
 	if r == nil {
@@ -106,7 +114,7 @@
 // When the stream is done, io.EOF will be returned.
 func (d *Decoder) Read(p []byte) (int, error) {
 	if d.stream == nil {
-		return 0, errors.New("no input has been initialized")
+		return 0, ErrDecoderNilInput
 	}
 	var n int
 	for {
@@ -147,12 +155,20 @@
 
 // Reset will reset the decoder the supplied stream after the current has finished processing.
 // Note that this functionality cannot be used after Close has been called.
+// Reset can be called with a nil reader to release references to the previous reader.
+// After being called with a nil reader, no other operations than Reset or DecodeAll or Close
+// should be used.
 func (d *Decoder) Reset(r io.Reader) error {
 	if d.current.err == ErrDecoderClosed {
 		return d.current.err
 	}
+
+	d.drainOutput()
+
 	if r == nil {
-		return errors.New("nil Reader sent as input")
+		d.current.err = ErrDecoderNilInput
+		d.current.flushed = true
+		return nil
 	}
 
 	if d.stream == nil {
@@ -161,15 +177,19 @@
 		go d.startStreamDecoder(d.stream)
 	}
 
-	d.drainOutput()
-
 	// If bytes buffer and < 1MB, do sync decoding anyway.
-	if bb, ok := r.(*bytes.Buffer); ok && bb.Len() < 1<<20 {
+	if bb, ok := r.(byter); ok && bb.Len() < 1<<20 {
+		bb2 := bb
 		if debug {
 			println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
 		}
-		b := bb.Bytes()
-		dst, err := d.DecodeAll(b, nil)
+		b := bb2.Bytes()
+		var dst []byte
+		if cap(d.current.b) > 0 {
+			dst = d.current.b
+		}
+
+		dst, err := d.DecodeAll(b, dst[:0])
 		if err == nil {
 			err = io.EOF
 		}
@@ -177,7 +197,7 @@
 		d.current.err = err
 		d.current.flushed = true
 		if debug {
-			println("sync decode to ", len(dst), "bytes, err:", err)
+			println("sync decode to", len(dst), "bytes, err:", err)
 		}
 		return nil
 	}
@@ -216,20 +236,17 @@
 		println("current already flushed")
 		return
 	}
-	for {
-		select {
-		case v := <-d.current.output:
-			if v.d != nil {
-				if debug {
-					printf("re-adding decoder %p", v.d)
-				}
-				d.decoders <- v.d
+	for v := range d.current.output {
+		if v.d != nil {
+			if debug {
+				printf("re-adding decoder %p", v.d)
 			}
-			if v.err == errEndOfStream {
-				println("current flushed")
-				d.current.flushed = true
-				return
-			}
+			d.decoders <- v.d
+		}
+		if v.err == errEndOfStream {
+			println("current flushed")
+			d.current.flushed = true
+			return
 		}
 	}
 }
@@ -239,7 +256,7 @@
 // Any error encountered during the write is also returned.
 func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
 	if d.stream == nil {
-		return 0, errors.New("no input has been initialized")
+		return 0, ErrDecoderNilInput
 	}
 	var n int64
 	for {
@@ -277,23 +294,34 @@
 	}
 
 	// Grab a block decoder and frame decoder.
-	block, frame := <-d.decoders, <-d.frames
+	block := <-d.decoders
+	frame := block.localFrame
 	defer func() {
 		if debug {
 			printf("re-adding decoder: %p", block)
 		}
-		d.decoders <- block
 		frame.rawInput = nil
 		frame.bBuf = nil
-		d.frames <- frame
+		d.decoders <- block
 	}()
 	frame.bBuf = input
 
 	for {
+		frame.history.reset()
 		err := frame.reset(&frame.bBuf)
 		if err == io.EOF {
+			if debug {
+				println("frame reset return EOF")
+			}
 			return dst, nil
 		}
+		if frame.DictionaryID != nil {
+			dict, ok := d.dicts[*frame.DictionaryID]
+			if !ok {
+				return nil, ErrUnknownDictionary
+			}
+			frame.history.setDict(&dict)
+		}
 		if err != nil {
 			return dst, err
 		}
@@ -302,20 +330,24 @@
 		}
 		if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
 			// Never preallocate moe than 1 GB up front.
-			if uint64(cap(dst)) < frame.FrameContentSize {
+			if cap(dst)-len(dst) < int(frame.FrameContentSize) {
 				dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
 				copy(dst2, dst)
 				dst = dst2
 			}
 		}
 		if cap(dst) == 0 {
-			// Allocate window size * 2 by default if nothing is provided and we didn't get frame content size.
-			size := frame.WindowSize * 2
+			// Allocate len(input) * 2 by default if nothing is provided
+			// and we didn't get frame content size.
+			size := len(input) * 2
 			// Cap to 1 MB.
 			if size > 1<<20 {
 				size = 1 << 20
 			}
-			dst = make([]byte, 0, frame.WindowSize)
+			if uint64(size) > d.o.maxDecodedSize {
+				size = int(d.o.maxDecodedSize)
+			}
+			dst = make([]byte, 0, size)
 		}
 
 		dst, err = frame.runDecoder(dst, block)
@@ -323,6 +355,9 @@
 			return dst, err
 		}
 		if len(frame.bBuf) == 0 {
+			if debug {
+				println("frame dbuf empty")
+			}
 			break
 		}
 	}
@@ -456,10 +491,19 @@
 		br := readerWrapper{r: stream.r}
 	decodeStream:
 		for {
+			frame.history.reset()
 			err := frame.reset(&br)
 			if debug && err != nil {
 				println("Frame decoder returned", err)
 			}
+			if err == nil && frame.DictionaryID != nil {
+				dict, ok := d.dicts[*frame.DictionaryID]
+				if !ok {
+					err = ErrUnknownDictionary
+				} else {
+					frame.history.setDict(&dict)
+				}
+			}
 			if err != nil {
 				stream.output <- decodeOutput{
 					err: err,