| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package naming |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| |
| etcd "github.com/coreos/etcd/clientv3" |
| |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/naming" |
| "google.golang.org/grpc/status" |
| ) |
| |
| var ErrWatcherClosed = fmt.Errorf("naming: watch closed") |
| |
| // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. |
| type GRPCResolver struct { |
| // Client is an initialized etcd client. |
| Client *etcd.Client |
| } |
| |
| func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) { |
| switch nm.Op { |
| case naming.Add: |
| var v []byte |
| if v, err = json.Marshal(nm); err != nil { |
| return status.Error(codes.InvalidArgument, err.Error()) |
| } |
| _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) |
| case naming.Delete: |
| _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) |
| default: |
| return status.Error(codes.InvalidArgument, "naming: bad naming op") |
| } |
| return err |
| } |
| |
| func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { |
| ctx, cancel := context.WithCancel(context.Background()) |
| w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} |
| return w, nil |
| } |
| |
| type gRPCWatcher struct { |
| c *etcd.Client |
| target string |
| ctx context.Context |
| cancel context.CancelFunc |
| wch etcd.WatchChan |
| err error |
| } |
| |
| // Next gets the next set of updates from the etcd resolver. |
| // Calls to Next should be serialized; concurrent calls are not safe since |
| // there is no way to reconcile the update ordering. |
| func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { |
| if gw.wch == nil { |
| // first Next() returns all addresses |
| return gw.firstNext() |
| } |
| if gw.err != nil { |
| return nil, gw.err |
| } |
| |
| // process new events on target/* |
| wr, ok := <-gw.wch |
| if !ok { |
| gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) |
| return nil, gw.err |
| } |
| if gw.err = wr.Err(); gw.err != nil { |
| return nil, gw.err |
| } |
| |
| updates := make([]*naming.Update, 0, len(wr.Events)) |
| for _, e := range wr.Events { |
| var jupdate naming.Update |
| var err error |
| switch e.Type { |
| case etcd.EventTypePut: |
| err = json.Unmarshal(e.Kv.Value, &jupdate) |
| jupdate.Op = naming.Add |
| case etcd.EventTypeDelete: |
| err = json.Unmarshal(e.PrevKv.Value, &jupdate) |
| jupdate.Op = naming.Delete |
| } |
| if err == nil { |
| updates = append(updates, &jupdate) |
| } |
| } |
| return updates, nil |
| } |
| |
| func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { |
| // Use serialized request so resolution still works if the target etcd |
| // server is partitioned away from the quorum. |
| resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) |
| if gw.err = err; err != nil { |
| return nil, err |
| } |
| |
| updates := make([]*naming.Update, 0, len(resp.Kvs)) |
| for _, kv := range resp.Kvs { |
| var jupdate naming.Update |
| if err := json.Unmarshal(kv.Value, &jupdate); err != nil { |
| continue |
| } |
| updates = append(updates, &jupdate) |
| } |
| |
| opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} |
| gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) |
| return updates, nil |
| } |
| |
| func (gw *gRPCWatcher) Close() { gw.cancel() } |