blob: a49b6823da9907fd4a38a6cc949e2c5d25326dc4 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "encoding/json"
19 "path"
20 "time"
21
22 "github.com/coreos/etcd/etcdserver/api"
23 "github.com/coreos/etcd/etcdserver/membership"
24 "github.com/coreos/etcd/pkg/pbutil"
25 "github.com/coreos/etcd/store"
26 "github.com/coreos/go-semver/semver"
27)
28
29// ApplierV2 is the interface for processing V2 raft messages
30type ApplierV2 interface {
31 Delete(r *RequestV2) Response
32 Post(r *RequestV2) Response
33 Put(r *RequestV2) Response
34 QGet(r *RequestV2) Response
35 Sync(r *RequestV2) Response
36}
37
38func NewApplierV2(s store.Store, c *membership.RaftCluster) ApplierV2 {
39 return &applierV2store{store: s, cluster: c}
40}
41
42type applierV2store struct {
43 store store.Store
44 cluster *membership.RaftCluster
45}
46
47func (a *applierV2store) Delete(r *RequestV2) Response {
48 switch {
49 case r.PrevIndex > 0 || r.PrevValue != "":
50 return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
51 default:
52 return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
53 }
54}
55
56func (a *applierV2store) Post(r *RequestV2) Response {
57 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
58}
59
60func (a *applierV2store) Put(r *RequestV2) Response {
61 ttlOptions := r.TTLOptions()
62 exists, existsSet := pbutil.GetBool(r.PrevExist)
63 switch {
64 case existsSet:
65 if exists {
66 if r.PrevIndex == 0 && r.PrevValue == "" {
67 return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
68 }
69 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
70 }
71 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
72 case r.PrevIndex > 0 || r.PrevValue != "":
73 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
74 default:
75 if storeMemberAttributeRegexp.MatchString(r.Path) {
76 id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
77 var attr membership.Attributes
78 if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
79 plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
80 }
81 if a.cluster != nil {
82 a.cluster.UpdateAttributes(id, attr)
83 }
84 // return an empty response since there is no consumer.
85 return Response{}
86 }
87 if r.Path == membership.StoreClusterVersionKey() {
88 if a.cluster != nil {
89 a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability)
90 }
91 // return an empty response since there is no consumer.
92 return Response{}
93 }
94 return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
95 }
96}
97
98func (a *applierV2store) QGet(r *RequestV2) Response {
99 return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
100}
101
102func (a *applierV2store) Sync(r *RequestV2) Response {
103 a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
104 return Response{}
105}
106
107// applyV2Request interprets r as a call to store.X and returns a Response interpreted
108// from store.Event
109func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
110 defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
111
112 switch r.Method {
113 case "POST":
114 return s.applyV2.Post(r)
115 case "PUT":
116 return s.applyV2.Put(r)
117 case "DELETE":
118 return s.applyV2.Delete(r)
119 case "QGET":
120 return s.applyV2.QGet(r)
121 case "SYNC":
122 return s.applyV2.Sync(r)
123 default:
124 // This should never be reached, but just in case:
125 return Response{Err: ErrUnknownMethod}
126 }
127}
128
129func (r *RequestV2) TTLOptions() store.TTLOptionSet {
130 refresh, _ := pbutil.GetBool(r.Refresh)
131 ttlOptions := store.TTLOptionSet{Refresh: refresh}
132 if r.Expiration != 0 {
133 ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
134 }
135 return ttlOptions
136}
137
138func toResponse(ev *store.Event, err error) Response {
139 return Response{Event: ev, Err: err}
140}