blob: b043d572dd739753faa1b6e633e2e07cdce4e08d [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package picker
16
17import (
18 "context"
19 "sync"
20
21 "go.uber.org/zap"
22 "go.uber.org/zap/zapcore"
23 "google.golang.org/grpc/balancer"
24 "google.golang.org/grpc/resolver"
25)
26
27// NewRoundrobinBalanced returns a new roundrobin balanced picker.
28func NewRoundrobinBalanced(
29 lg *zap.Logger,
30 scs []balancer.SubConn,
31 addrToSc map[resolver.Address]balancer.SubConn,
32 scToAddr map[balancer.SubConn]resolver.Address,
33) Picker {
34 return &rrBalanced{
35 lg: lg,
36 scs: scs,
37 addrToSc: addrToSc,
38 scToAddr: scToAddr,
39 }
40}
41
42type rrBalanced struct {
43 lg *zap.Logger
44
45 mu sync.RWMutex
46 next int
47 scs []balancer.SubConn
48
49 addrToSc map[resolver.Address]balancer.SubConn
50 scToAddr map[balancer.SubConn]resolver.Address
51}
52
53// Pick is called for every client request.
54func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
55 rb.mu.RLock()
56 n := len(rb.scs)
57 rb.mu.RUnlock()
58 if n == 0 {
59 return nil, nil, balancer.ErrNoSubConnAvailable
60 }
61
62 rb.mu.Lock()
63 cur := rb.next
64 sc := rb.scs[cur]
65 picked := rb.scToAddr[sc].Addr
66 rb.next = (rb.next + 1) % len(rb.scs)
67 rb.mu.Unlock()
68
69 rb.lg.Debug(
70 "picked",
71 zap.String("address", picked),
72 zap.Int("subconn-index", cur),
73 zap.Int("subconn-size", n),
74 )
75
76 doneFunc := func(info balancer.DoneInfo) {
77 // TODO: error handling?
78 fss := []zapcore.Field{
79 zap.Error(info.Err),
80 zap.String("address", picked),
81 zap.Bool("success", info.Err == nil),
82 zap.Bool("bytes-sent", info.BytesSent),
83 zap.Bool("bytes-received", info.BytesReceived),
84 }
85 if info.Err == nil {
86 rb.lg.Debug("balancer done", fss...)
87 } else {
88 rb.lg.Warn("balancer failed", fss...)
89 }
90 }
91 return sc, doneFunc, nil
92}