VOL-1868 move simulated onu from voltha-go to voltha-simonu-adapter
Sourced from voltha-go commit 251a11c0ffe60512318a644cd6ce0dc4e12f4018
Change-Id: Iab179bc2f3dd772ed7f488d1c03d1a84ba75e874
diff --git a/vendor/google.golang.org/grpc/balancer_v1_wrapper.go b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
new file mode 100644
index 0000000..29bda63
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
@@ -0,0 +1,341 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "context"
+ "strings"
+ "sync"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/resolver"
+)
+
+type balancerWrapperBuilder struct {
+ b Balancer // The v1 balancer.
+}
+
+func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
+ targetAddr := cc.Target()
+ targetSplitted := strings.Split(targetAddr, ":///")
+ if len(targetSplitted) >= 2 {
+ targetAddr = targetSplitted[1]
+ }
+
+ bwb.b.Start(targetAddr, BalancerConfig{
+ DialCreds: opts.DialCreds,
+ Dialer: opts.Dialer,
+ })
+ _, pickfirst := bwb.b.(*pickFirst)
+ bw := &balancerWrapper{
+ balancer: bwb.b,
+ pickfirst: pickfirst,
+ cc: cc,
+ targetAddr: targetAddr,
+ startCh: make(chan struct{}),
+ conns: make(map[resolver.Address]balancer.SubConn),
+ connSt: make(map[balancer.SubConn]*scState),
+ csEvltr: &balancer.ConnectivityStateEvaluator{},
+ state: connectivity.Idle,
+ }
+ cc.UpdateBalancerState(connectivity.Idle, bw)
+ go bw.lbWatcher()
+ return bw
+}
+
+func (bwb *balancerWrapperBuilder) Name() string {
+ return "wrapper"
+}
+
+type scState struct {
+ addr Address // The v1 address type.
+ s connectivity.State
+ down func(error)
+}
+
+type balancerWrapper struct {
+ balancer Balancer // The v1 balancer.
+ pickfirst bool
+
+ cc balancer.ClientConn
+ targetAddr string // Target without the scheme.
+
+ mu sync.Mutex
+ conns map[resolver.Address]balancer.SubConn
+ connSt map[balancer.SubConn]*scState
+ // This channel is closed when handling the first resolver result.
+ // lbWatcher blocks until this is closed, to avoid race between
+ // - NewSubConn is created, cc wants to notify balancer of state changes;
+ // - Build hasn't return, cc doesn't have access to balancer.
+ startCh chan struct{}
+
+ // To aggregate the connectivity state.
+ csEvltr *balancer.ConnectivityStateEvaluator
+ state connectivity.State
+}
+
+// lbWatcher watches the Notify channel of the balancer and manages
+// connections accordingly.
+func (bw *balancerWrapper) lbWatcher() {
+ <-bw.startCh
+ notifyCh := bw.balancer.Notify()
+ if notifyCh == nil {
+ // There's no resolver in the balancer. Connect directly.
+ a := resolver.Address{
+ Addr: bw.targetAddr,
+ Type: resolver.Backend,
+ }
+ sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
+ } else {
+ bw.mu.Lock()
+ bw.conns[a] = sc
+ bw.connSt[sc] = &scState{
+ addr: Address{Addr: bw.targetAddr},
+ s: connectivity.Idle,
+ }
+ bw.mu.Unlock()
+ sc.Connect()
+ }
+ return
+ }
+
+ for addrs := range notifyCh {
+ grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
+ if bw.pickfirst {
+ var (
+ oldA resolver.Address
+ oldSC balancer.SubConn
+ )
+ bw.mu.Lock()
+ for oldA, oldSC = range bw.conns {
+ break
+ }
+ bw.mu.Unlock()
+ if len(addrs) <= 0 {
+ if oldSC != nil {
+ // Teardown old sc.
+ bw.mu.Lock()
+ delete(bw.conns, oldA)
+ delete(bw.connSt, oldSC)
+ bw.mu.Unlock()
+ bw.cc.RemoveSubConn(oldSC)
+ }
+ continue
+ }
+
+ var newAddrs []resolver.Address
+ for _, a := range addrs {
+ newAddr := resolver.Address{
+ Addr: a.Addr,
+ Type: resolver.Backend, // All addresses from balancer are all backends.
+ ServerName: "",
+ Metadata: a.Metadata,
+ }
+ newAddrs = append(newAddrs, newAddr)
+ }
+ if oldSC == nil {
+ // Create new sc.
+ sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
+ } else {
+ bw.mu.Lock()
+ // For pickfirst, there should be only one SubConn, so the
+ // address doesn't matter. All states updating (up and down)
+ // and picking should all happen on that only SubConn.
+ bw.conns[resolver.Address{}] = sc
+ bw.connSt[sc] = &scState{
+ addr: addrs[0], // Use the first address.
+ s: connectivity.Idle,
+ }
+ bw.mu.Unlock()
+ sc.Connect()
+ }
+ } else {
+ bw.mu.Lock()
+ bw.connSt[oldSC].addr = addrs[0]
+ bw.mu.Unlock()
+ oldSC.UpdateAddresses(newAddrs)
+ }
+ } else {
+ var (
+ add []resolver.Address // Addresses need to setup connections.
+ del []balancer.SubConn // Connections need to tear down.
+ )
+ resAddrs := make(map[resolver.Address]Address)
+ for _, a := range addrs {
+ resAddrs[resolver.Address{
+ Addr: a.Addr,
+ Type: resolver.Backend, // All addresses from balancer are all backends.
+ ServerName: "",
+ Metadata: a.Metadata,
+ }] = a
+ }
+ bw.mu.Lock()
+ for a := range resAddrs {
+ if _, ok := bw.conns[a]; !ok {
+ add = append(add, a)
+ }
+ }
+ for a, c := range bw.conns {
+ if _, ok := resAddrs[a]; !ok {
+ del = append(del, c)
+ delete(bw.conns, a)
+ // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
+ }
+ }
+ bw.mu.Unlock()
+ for _, a := range add {
+ sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
+ } else {
+ bw.mu.Lock()
+ bw.conns[a] = sc
+ bw.connSt[sc] = &scState{
+ addr: resAddrs[a],
+ s: connectivity.Idle,
+ }
+ bw.mu.Unlock()
+ sc.Connect()
+ }
+ }
+ for _, c := range del {
+ bw.cc.RemoveSubConn(c)
+ }
+ }
+ }
+}
+
+func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ scSt, ok := bw.connSt[sc]
+ if !ok {
+ return
+ }
+ if s == connectivity.Idle {
+ sc.Connect()
+ }
+ oldS := scSt.s
+ scSt.s = s
+ if oldS != connectivity.Ready && s == connectivity.Ready {
+ scSt.down = bw.balancer.Up(scSt.addr)
+ } else if oldS == connectivity.Ready && s != connectivity.Ready {
+ if scSt.down != nil {
+ scSt.down(errConnClosing)
+ }
+ }
+ sa := bw.csEvltr.RecordTransition(oldS, s)
+ if bw.state != sa {
+ bw.state = sa
+ }
+ bw.cc.UpdateBalancerState(bw.state, bw)
+ if s == connectivity.Shutdown {
+ // Remove state for this sc.
+ delete(bw.connSt, sc)
+ }
+}
+
+func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ select {
+ case <-bw.startCh:
+ default:
+ close(bw.startCh)
+ }
+ // There should be a resolver inside the balancer.
+ // All updates here, if any, are ignored.
+}
+
+func (bw *balancerWrapper) Close() {
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ select {
+ case <-bw.startCh:
+ default:
+ close(bw.startCh)
+ }
+ bw.balancer.Close()
+}
+
+// The picker is the balancerWrapper itself.
+// It either blocks or returns error, consistent with v1 balancer Get().
+func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (sc balancer.SubConn, done func(balancer.DoneInfo), err error) {
+ failfast := true // Default failfast is true.
+ if ss, ok := rpcInfoFromContext(ctx); ok {
+ failfast = ss.failfast
+ }
+ a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
+ if err != nil {
+ return nil, nil, err
+ }
+ if p != nil {
+ done = func(balancer.DoneInfo) { p() }
+ defer func() {
+ if err != nil {
+ p()
+ }
+ }()
+ }
+
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ if bw.pickfirst {
+ // Get the first sc in conns.
+ for _, sc := range bw.conns {
+ return sc, done, nil
+ }
+ return nil, nil, balancer.ErrNoSubConnAvailable
+ }
+ sc, ok1 := bw.conns[resolver.Address{
+ Addr: a.Addr,
+ Type: resolver.Backend,
+ ServerName: "",
+ Metadata: a.Metadata,
+ }]
+ s, ok2 := bw.connSt[sc]
+ if !ok1 || !ok2 {
+ // This can only happen due to a race where Get() returned an address
+ // that was subsequently removed by Notify. In this case we should
+ // retry always.
+ return nil, nil, balancer.ErrNoSubConnAvailable
+ }
+ switch s.s {
+ case connectivity.Ready, connectivity.Idle:
+ return sc, done, nil
+ case connectivity.Shutdown, connectivity.TransientFailure:
+ // If the returned sc has been shut down or is in transient failure,
+ // return error, and this RPC will fail or wait for another picker (if
+ // non-failfast).
+ return nil, nil, balancer.ErrTransientFailure
+ default:
+ // For other states (connecting or unknown), the v1 balancer would
+ // traditionally wait until ready and then issue the RPC. Returning
+ // ErrNoSubConnAvailable will be a slight improvement in that it will
+ // allow the balancer to choose another address in case others are
+ // connected.
+ return nil, nil, balancer.ErrNoSubConnAvailable
+ }
+}