blob: 3c0e8e664bd30508027dd723456e7fca4c00306d [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package naming
16
17import (
18 "context"
19 "encoding/json"
20 "fmt"
21
22 etcd "github.com/coreos/etcd/clientv3"
23
24 "google.golang.org/grpc/codes"
25 "google.golang.org/grpc/naming"
26 "google.golang.org/grpc/status"
27)
28
29var ErrWatcherClosed = fmt.Errorf("naming: watch closed")
30
31// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
32type GRPCResolver struct {
33 // Client is an initialized etcd client.
34 Client *etcd.Client
35}
36
37func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
38 switch nm.Op {
39 case naming.Add:
40 var v []byte
41 if v, err = json.Marshal(nm); err != nil {
42 return status.Error(codes.InvalidArgument, err.Error())
43 }
44 _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
45 case naming.Delete:
46 _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
47 default:
48 return status.Error(codes.InvalidArgument, "naming: bad naming op")
49 }
50 return err
51}
52
53func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
54 ctx, cancel := context.WithCancel(context.Background())
55 w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
56 return w, nil
57}
58
59type gRPCWatcher struct {
60 c *etcd.Client
61 target string
62 ctx context.Context
63 cancel context.CancelFunc
64 wch etcd.WatchChan
65 err error
66}
67
68// Next gets the next set of updates from the etcd resolver.
69// Calls to Next should be serialized; concurrent calls are not safe since
70// there is no way to reconcile the update ordering.
71func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
72 if gw.wch == nil {
73 // first Next() returns all addresses
74 return gw.firstNext()
75 }
76 if gw.err != nil {
77 return nil, gw.err
78 }
79
80 // process new events on target/*
81 wr, ok := <-gw.wch
82 if !ok {
83 gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
84 return nil, gw.err
85 }
86 if gw.err = wr.Err(); gw.err != nil {
87 return nil, gw.err
88 }
89
90 updates := make([]*naming.Update, 0, len(wr.Events))
91 for _, e := range wr.Events {
92 var jupdate naming.Update
93 var err error
94 switch e.Type {
95 case etcd.EventTypePut:
96 err = json.Unmarshal(e.Kv.Value, &jupdate)
97 jupdate.Op = naming.Add
98 case etcd.EventTypeDelete:
99 err = json.Unmarshal(e.PrevKv.Value, &jupdate)
100 jupdate.Op = naming.Delete
101 }
102 if err == nil {
103 updates = append(updates, &jupdate)
104 }
105 }
106 return updates, nil
107}
108
109func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
110 // Use serialized request so resolution still works if the target etcd
111 // server is partitioned away from the quorum.
112 resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
113 if gw.err = err; err != nil {
114 return nil, err
115 }
116
117 updates := make([]*naming.Update, 0, len(resp.Kvs))
118 for _, kv := range resp.Kvs {
119 var jupdate naming.Update
120 if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
121 continue
122 }
123 updates = append(updates, &jupdate)
124 }
125
126 opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
127 gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
128 return updates, nil
129}
130
131func (gw *gRPCWatcher) Close() { gw.cancel() }