blob: 14ab1190ed9885edf98be32da58228db117b48b8 [file] [log] [blame]
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package membership
import (
"encoding/json"
"fmt"
"path"
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/types"
"github.com/coreos/go-semver/semver"
)
const (
attributesSuffix = "attributes"
raftAttributesSuffix = "raftAttributes"
// the prefix for stroing membership related information in store provided by store pkg.
storePrefix = "/0"
)
var (
membersBucketName = []byte("members")
membersRemovedBucketName = []byte("members_removed")
clusterBucketName = []byte("cluster")
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
)
func mustSaveMemberToBackend(be backend.Backend, m *Member) {
mkey := backendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
tx := be.BatchTx()
tx.Lock()
tx.UnsafePut(membersBucketName, mkey, mvalue)
tx.Unlock()
}
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
mkey := backendMemberKey(id)
tx := be.BatchTx()
tx.Lock()
tx.UnsafeDelete(membersBucketName, mkey)
tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed"))
tx.Unlock()
}
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
}
func mustSaveMemberToStore(s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
plog.Panicf("create raftAttributes should never fail: %v", err)
}
}
func mustDeleteMemberFromStore(s v2store.Store, id types.ID) {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
plog.Panicf("delete member should never fail: %v", err)
}
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
plog.Panicf("create removedMember should never fail: %v", err)
}
}
func mustUpdateMemberInStore(s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
plog.Panicf("update raftAttributes should never fail: %v", err)
}
}
func mustUpdateMemberAttrInStore(s v2store.Store, m *Member) {
b, err := json.Marshal(m.Attributes)
if err != nil {
plog.Panicf("marshal raftAttributes should never fail: %v", err)
}
p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
plog.Panicf("update raftAttributes should never fail: %v", err)
}
}
func mustSaveClusterVersionToStore(s v2store.Store, ver *semver.Version) {
if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
plog.Panicf("save cluster version should never fail: %v", err)
}
}
// nodeToMember builds member from a key value node.
// the child nodes of the given node MUST be sorted by key.
func nodeToMember(n *v2store.NodeExtern) (*Member, error) {
m := &Member{ID: MustParseMemberIDFromKey(n.Key)}
attrs := make(map[string][]byte)
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
attrKey := path.Join(n.Key, attributesSuffix)
for _, nn := range n.Nodes {
if nn.Key != raftAttrKey && nn.Key != attrKey {
return nil, fmt.Errorf("unknown key %q", nn.Key)
}
attrs[nn.Key] = []byte(*nn.Value)
}
if data := attrs[raftAttrKey]; data != nil {
if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
}
} else {
return nil, fmt.Errorf("raftAttributes key doesn't exist")
}
if data := attrs[attrKey]; data != nil {
if err := json.Unmarshal(data, &m.Attributes); err != nil {
return m, fmt.Errorf("unmarshal attributes error: %v", err)
}
}
return m, nil
}
func backendMemberKey(id types.ID) []byte {
return []byte(id.String())
}
func backendClusterVersionKey() []byte {
return []byte("clusterVersion")
}
func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(membersBucketName)
tx.UnsafeCreateBucket(membersRemovedBucketName)
tx.UnsafeCreateBucket(clusterBucketName)
}
func MemberStoreKey(id types.ID) string {
return path.Join(StoreMembersPrefix, id.String())
}
func StoreClusterVersionKey() string {
return path.Join(storePrefix, "version")
}
func MemberAttributesStorePath(id types.ID) string {
return path.Join(MemberStoreKey(id), attributesSuffix)
}
func MustParseMemberIDFromKey(key string) types.ID {
id, err := types.IDFromString(path.Base(key))
if err != nil {
plog.Panicf("unexpected parse member id error: %v", err)
}
return id
}
func RemovedMemberStoreKey(id types.ID) string {
return path.Join(storeRemovedMembersPrefix, id.String())
}