blob: 59c543eafe850e6072f407ea6eaaffd3df9cb241 [file] [log] [blame]
kdarapuf0c0e382019-09-30 05:26:31 +05301// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mockserver
16
17import (
18 "context"
19 "fmt"
20 "io/ioutil"
21 "net"
22 "os"
23 "sync"
24
25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/resolver"
29)
30
31// MockServer provides a mocked out grpc server of the etcdserver interface.
32type MockServer struct {
33 ln net.Listener
34 Network string
35 Address string
36 GrpcServer *grpc.Server
37}
38
39func (ms *MockServer) ResolverAddress() resolver.Address {
40 switch ms.Network {
41 case "unix":
42 return resolver.Address{Addr: fmt.Sprintf("unix://%s", ms.Address)}
43 case "tcp":
44 return resolver.Address{Addr: ms.Address}
45 default:
46 panic("illegal network type: " + ms.Network)
47 }
48}
49
50// MockServers provides a cluster of mocket out gprc servers of the etcdserver interface.
51type MockServers struct {
52 mu sync.RWMutex
53 Servers []*MockServer
54 wg sync.WaitGroup
55}
56
57// StartMockServers creates the desired count of mock servers
58// and starts them.
59func StartMockServers(count int) (ms *MockServers, err error) {
60 return StartMockServersOnNetwork(count, "tcp")
61}
62
63// StartMockServersOnNetwork creates mock servers on either 'tcp' or 'unix' sockets.
64func StartMockServersOnNetwork(count int, network string) (ms *MockServers, err error) {
65 switch network {
66 case "tcp":
67 return startMockServersTcp(count)
68 case "unix":
69 return startMockServersUnix(count)
70 default:
71 return nil, fmt.Errorf("unsupported network type: %s", network)
72 }
73}
74
75func startMockServersTcp(count int) (ms *MockServers, err error) {
76 addrs := make([]string, 0, count)
77 for i := 0; i < count; i++ {
78 addrs = append(addrs, "localhost:0")
79 }
80 return startMockServers("tcp", addrs)
81}
82
83func startMockServersUnix(count int) (ms *MockServers, err error) {
84 dir := os.TempDir()
85 addrs := make([]string, 0, count)
86 for i := 0; i < count; i++ {
87 f, err := ioutil.TempFile(dir, "etcd-unix-so-")
88 if err != nil {
89 return nil, fmt.Errorf("failed to allocate temp file for unix socket: %v", err)
90 }
91 fn := f.Name()
92 err = os.Remove(fn)
93 if err != nil {
94 return nil, fmt.Errorf("failed to remove temp file before creating unix socket: %v", err)
95 }
96 addrs = append(addrs, fn)
97 }
98 return startMockServers("unix", addrs)
99}
100
101func startMockServers(network string, addrs []string) (ms *MockServers, err error) {
102 ms = &MockServers{
103 Servers: make([]*MockServer, len(addrs)),
104 wg: sync.WaitGroup{},
105 }
106 defer func() {
107 if err != nil {
108 ms.Stop()
109 }
110 }()
111 for idx, addr := range addrs {
112 ln, err := net.Listen(network, addr)
113 if err != nil {
114 return nil, fmt.Errorf("failed to listen %v", err)
115 }
116 ms.Servers[idx] = &MockServer{ln: ln, Network: network, Address: ln.Addr().String()}
117 ms.StartAt(idx)
118 }
119 return ms, nil
120}
121
122// StartAt restarts mock server at given index.
123func (ms *MockServers) StartAt(idx int) (err error) {
124 ms.mu.Lock()
125 defer ms.mu.Unlock()
126
127 if ms.Servers[idx].ln == nil {
128 ms.Servers[idx].ln, err = net.Listen(ms.Servers[idx].Network, ms.Servers[idx].Address)
129 if err != nil {
130 return fmt.Errorf("failed to listen %v", err)
131 }
132 }
133
134 svr := grpc.NewServer()
135 pb.RegisterKVServer(svr, &mockKVServer{})
136 ms.Servers[idx].GrpcServer = svr
137
138 ms.wg.Add(1)
139 go func(svr *grpc.Server, l net.Listener) {
140 svr.Serve(l)
141 }(ms.Servers[idx].GrpcServer, ms.Servers[idx].ln)
142 return nil
143}
144
145// StopAt stops mock server at given index.
146func (ms *MockServers) StopAt(idx int) {
147 ms.mu.Lock()
148 defer ms.mu.Unlock()
149
150 if ms.Servers[idx].ln == nil {
151 return
152 }
153
154 ms.Servers[idx].GrpcServer.Stop()
155 ms.Servers[idx].GrpcServer = nil
156 ms.Servers[idx].ln = nil
157 ms.wg.Done()
158}
159
160// Stop stops the mock server, immediately closing all open connections and listeners.
161func (ms *MockServers) Stop() {
162 for idx := range ms.Servers {
163 ms.StopAt(idx)
164 }
165 ms.wg.Wait()
166}
167
168type mockKVServer struct{}
169
170func (m *mockKVServer) Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error) {
171 return &pb.RangeResponse{}, nil
172}
173
174func (m *mockKVServer) Put(context.Context, *pb.PutRequest) (*pb.PutResponse, error) {
175 return &pb.PutResponse{}, nil
176}
177
178func (m *mockKVServer) DeleteRange(context.Context, *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
179 return &pb.DeleteRangeResponse{}, nil
180}
181
182func (m *mockKVServer) Txn(context.Context, *pb.TxnRequest) (*pb.TxnResponse, error) {
183 return &pb.TxnResponse{}, nil
184}
185
186func (m *mockKVServer) Compact(context.Context, *pb.CompactionRequest) (*pb.CompactionResponse, error) {
187 return &pb.CompactionResponse{}, nil
188}