[SEBA-660] : Adding Igmp support in BbSim
Change-Id: I9f5c7d8ad39ac82850b04e2c997996d6c47b32d2
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index 45baa2a..0044789 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -20,6 +20,7 @@
import (
"context"
+ "fmt"
"io"
"sync"
@@ -31,49 +32,78 @@
"google.golang.org/grpc/status"
)
+// v2PickerWrapper wraps a balancer.Picker while providing the
+// balancer.V2Picker API. It requires a pickerWrapper to generate errors
+// including the latest connectionError. To be deleted when balancer.Picker is
+// updated to the balancer.V2Picker API.
+type v2PickerWrapper struct {
+ picker balancer.Picker
+ connErr *connErr
+}
+
+func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+ sc, done, err := v.picker.Pick(info.Ctx, info)
+ if err != nil {
+ if err == balancer.ErrTransientFailure {
+ return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
+ }
+ return balancer.PickResult{}, err
+ }
+ return balancer.PickResult{SubConn: sc, Done: done}, nil
+}
+
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
- picker balancer.Picker
+ picker balancer.V2Picker
- // The latest connection happened.
- connErrMu sync.Mutex
- connErr error
+ // The latest connection error. TODO: remove when V1 picker is deprecated;
+ // balancer should be responsible for providing the error.
+ *connErr
}
-func newPickerWrapper() *pickerWrapper {
- bp := &pickerWrapper{blockingCh: make(chan struct{})}
- return bp
+type connErr struct {
+ mu sync.Mutex
+ err error
}
-func (bp *pickerWrapper) updateConnectionError(err error) {
- bp.connErrMu.Lock()
- bp.connErr = err
- bp.connErrMu.Unlock()
+func (c *connErr) updateConnectionError(err error) {
+ c.mu.Lock()
+ c.err = err
+ c.mu.Unlock()
}
-func (bp *pickerWrapper) connectionError() error {
- bp.connErrMu.Lock()
- err := bp.connErr
- bp.connErrMu.Unlock()
+func (c *connErr) connectionError() error {
+ c.mu.Lock()
+ err := c.err
+ c.mu.Unlock()
return err
}
+func newPickerWrapper() *pickerWrapper {
+ return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}}
+}
+
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
-func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
- bp.mu.Lock()
- if bp.done {
- bp.mu.Unlock()
+func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
+ pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
+}
+
+// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
+func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) {
+ pw.mu.Lock()
+ if pw.done {
+ pw.mu.Unlock()
return
}
- bp.picker = p
- // bp.blockingCh should never be nil.
- close(bp.blockingCh)
- bp.blockingCh = make(chan struct{})
- bp.mu.Unlock()
+ pw.picker = p
+ // pw.blockingCh should never be nil.
+ close(pw.blockingCh)
+ pw.blockingCh = make(chan struct{})
+ pw.mu.Unlock()
}
func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
@@ -100,83 +130,85 @@
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
-func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
var ch chan struct{}
+ var lastPickErr error
for {
- bp.mu.Lock()
- if bp.done {
- bp.mu.Unlock()
+ pw.mu.Lock()
+ if pw.done {
+ pw.mu.Unlock()
return nil, nil, ErrClientConnClosing
}
- if bp.picker == nil {
- ch = bp.blockingCh
+ if pw.picker == nil {
+ ch = pw.blockingCh
}
- if ch == bp.blockingCh {
+ if ch == pw.blockingCh {
// This could happen when either:
- // - bp.picker is nil (the previous if condition), or
+ // - pw.picker is nil (the previous if condition), or
// - has called pick on the current picker.
- bp.mu.Unlock()
+ pw.mu.Unlock()
select {
case <-ctx.Done():
- if connectionErr := bp.connectionError(); connectionErr != nil {
- switch ctx.Err() {
- case context.DeadlineExceeded:
- return nil, nil, status.Errorf(codes.DeadlineExceeded, "latest connection error: %v", connectionErr)
- case context.Canceled:
- return nil, nil, status.Errorf(codes.Canceled, "latest connection error: %v", connectionErr)
- }
+ var errStr string
+ if lastPickErr != nil {
+ errStr = "latest balancer error: " + lastPickErr.Error()
+ } else if connectionErr := pw.connectionError(); connectionErr != nil {
+ errStr = "latest connection error: " + connectionErr.Error()
+ } else {
+ errStr = ctx.Err().Error()
}
- return nil, nil, ctx.Err()
+ switch ctx.Err() {
+ case context.DeadlineExceeded:
+ return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
+ case context.Canceled:
+ return nil, nil, status.Error(codes.Canceled, errStr)
+ }
case <-ch:
}
continue
}
- ch = bp.blockingCh
- p := bp.picker
- bp.mu.Unlock()
+ ch = pw.blockingCh
+ p := pw.picker
+ pw.mu.Unlock()
- subConn, done, err := p.Pick(ctx, opts)
+ pickResult, err := p.Pick(info)
if err != nil {
- switch err {
- case balancer.ErrNoSubConnAvailable:
+ if err == balancer.ErrNoSubConnAvailable {
continue
- case balancer.ErrTransientFailure:
+ }
+ if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() {
if !failfast {
+ lastPickErr = err
continue
}
- return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
- case context.DeadlineExceeded:
- return nil, nil, status.Error(codes.DeadlineExceeded, err.Error())
- case context.Canceled:
- return nil, nil, status.Error(codes.Canceled, err.Error())
- default:
- if _, ok := status.FromError(err); ok {
- return nil, nil, err
- }
- // err is some other error.
- return nil, nil, status.Error(codes.Unknown, err.Error())
+ return nil, nil, status.Error(codes.Unavailable, err.Error())
}
+ if _, ok := status.FromError(err); ok {
+ return nil, nil, err
+ }
+ // err is some other error.
+ return nil, nil, status.Error(codes.Unknown, err.Error())
}
- acw, ok := subConn.(*acBalancerWrapper)
+ acw, ok := pickResult.SubConn.(*acBalancerWrapper)
if !ok {
grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
if channelz.IsOn() {
- return t, doneChannelzWrapper(acw, done), nil
+ return t, doneChannelzWrapper(acw, pickResult.Done), nil
}
- return t, done, nil
+ return t, pickResult.Done, nil
}
- if done != nil {
+ if pickResult.Done != nil {
// Calling done with nil error, no bytes sent and no bytes received.
// DoneInfo with default value works.
- done(balancer.DoneInfo{})
+ pickResult.Done(balancer.DoneInfo{})
}
grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
// If ok == false, ac.state is not READY.
@@ -186,12 +218,12 @@
}
}
-func (bp *pickerWrapper) close() {
- bp.mu.Lock()
- defer bp.mu.Unlock()
- if bp.done {
+func (pw *pickerWrapper) close() {
+ pw.mu.Lock()
+ defer pw.mu.Unlock()
+ if pw.done {
return
}
- bp.done = true
- close(bp.blockingCh)
+ pw.done = true
+ close(pw.blockingCh)
}