gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
index 080490a..f3c5057 100644
--- a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
+++ b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
@@ -23,7 +23,7 @@
"sync"
"time"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -38,6 +38,7 @@
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ ctx = withVersion(ctx)
grpcOpts, retryOpts := filterCallOptions(opts)
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
// short circuit for simplicity, and avoiding allocations.
@@ -103,6 +104,7 @@
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ ctx = withVersion(ctx)
grpcOpts, retryOpts := filterCallOptions(opts)
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
// short circuit for simplicity, and avoiding allocations.
@@ -113,10 +115,9 @@
return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
}
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
- logger.Warn("retry stream intercept", zap.Error(err))
if err != nil {
- // TODO(mwitkow): Maybe dial and transport errors should be retriable?
- return nil, err
+ logger.Error("streamer failed to create ClientStream", zap.Error(err))
+ return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
}
retryingStreamer := &serverStreamingRetryingStream{
client: c,
@@ -185,6 +186,7 @@
if !attemptRetry {
return lastErr // success or hard failure
}
+
// We start off from attempt 1, because zeroth was already made on normal SendMsg().
for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
@@ -192,12 +194,13 @@
}
newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
if err != nil {
- // TODO(mwitkow): Maybe dial and transport errors should be retriable?
- return err
+ s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
+ return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
}
s.setStream(newStream)
+
+ s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
- //fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr)
if !attemptRetry {
return lastErr
}