blob: 9238b2dc58030b4c60664c0a8c955803403e1baf [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "context"
19 "time"
20
21 "go.etcd.io/etcd/etcdserver/api/v2store"
22 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
23)
24
25type RequestV2 pb.Request
26
27type RequestV2Handler interface {
28 Post(ctx context.Context, r *RequestV2) (Response, error)
29 Put(ctx context.Context, r *RequestV2) (Response, error)
30 Delete(ctx context.Context, r *RequestV2) (Response, error)
31 QGet(ctx context.Context, r *RequestV2) (Response, error)
32 Get(ctx context.Context, r *RequestV2) (Response, error)
33 Head(ctx context.Context, r *RequestV2) (Response, error)
34}
35
36type reqV2HandlerEtcdServer struct {
37 reqV2HandlerStore
38 s *EtcdServer
39}
40
41type reqV2HandlerStore struct {
42 store v2store.Store
43 applier ApplierV2
44}
45
46func NewStoreRequestV2Handler(s v2store.Store, applier ApplierV2) RequestV2Handler {
47 return &reqV2HandlerStore{s, applier}
48}
49
50func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) {
51 return a.applier.Post(r), nil
52}
53
54func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
55 return a.applier.Put(r), nil
56}
57
58func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {
59 return a.applier.Delete(r), nil
60}
61
62func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) {
63 return a.applier.QGet(r), nil
64}
65
66func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) {
67 if r.Wait {
68 wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
69 return Response{Watcher: wc}, err
70 }
71 ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
72 return Response{Event: ev}, err
73}
74
75func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) {
76 ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
77 return Response{Event: ev}, err
78}
79
80func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) {
81 return a.processRaftRequest(ctx, r)
82}
83
84func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) {
85 return a.processRaftRequest(ctx, r)
86}
87
88func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) {
89 return a.processRaftRequest(ctx, r)
90}
91
92func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) {
93 return a.processRaftRequest(ctx, r)
94}
95
96func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) {
97 data, err := ((*pb.Request)(r)).Marshal()
98 if err != nil {
99 return Response{}, err
100 }
101 ch := a.s.w.Register(r.ID)
102
103 start := time.Now()
104 a.s.r.Propose(ctx, data)
105 proposalsPending.Inc()
106 defer proposalsPending.Dec()
107
108 select {
109 case x := <-ch:
110 resp := x.(Response)
111 return resp, resp.Err
112 case <-ctx.Done():
113 proposalsFailed.Inc()
114 a.s.w.Trigger(r.ID, nil) // GC wait
115 return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
116 case <-a.s.stopping:
117 }
118 return Response{}, ErrStopped
119}
120
121func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
122 r.ID = s.reqIDGen.Next()
123 h := &reqV2HandlerEtcdServer{
124 reqV2HandlerStore: reqV2HandlerStore{
125 store: s.v2store,
126 applier: s.applyV2,
127 },
128 s: s,
129 }
130 rp := &r
131 resp, err := ((*RequestV2)(rp)).Handle(ctx, h)
132 resp.Term, resp.Index = s.Term(), s.CommittedIndex()
133 return resp, err
134}
135
136// Handle interprets r and performs an operation on s.store according to r.Method
137// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
138// Quorum == true, r will be sent through consensus before performing its
139// respective operation. Do will block until an action is performed or there is
140// an error.
141func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) {
142 if r.Method == "GET" && r.Quorum {
143 r.Method = "QGET"
144 }
145 switch r.Method {
146 case "POST":
147 return v2api.Post(ctx, r)
148 case "PUT":
149 return v2api.Put(ctx, r)
150 case "DELETE":
151 return v2api.Delete(ctx, r)
152 case "QGET":
153 return v2api.QGet(ctx, r)
154 case "GET":
155 return v2api.Get(ctx, r)
156 case "HEAD":
157 return v2api.Head(ctx, r)
158 }
159 return Response{}, ErrUnknownMethod
160}
161
162func (r *RequestV2) String() string {
163 rpb := pb.Request(*r)
164 return rpb.String()
165}