blob: 9fb3c01498216223ab6906df992bc685da6daf71 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright (c) 2012-2018 Ugorji Nwoke. All rights reserved.
2// Use of this source code is governed by a MIT license found in the LICENSE file.
3
4package codec
5
6import (
7 "bufio"
8 "errors"
9 "io"
10 "net/rpc"
11 "sync"
12)
13
14// Rpc provides a rpc Server or Client Codec for rpc communication.
15type Rpc interface {
16 ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
17 ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
18}
19
20// RPCOptions holds options specific to rpc functionality
21type RPCOptions struct {
22 // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
23 //
24 // Set RPCNoBuffer=true to turn buffering off.
25 // Buffering can still be done if buffered connections are passed in, or
26 // buffering is configured on the handle.
27 RPCNoBuffer bool
28}
29
30// rpcCodec defines the struct members and common methods.
31type rpcCodec struct {
32 c io.Closer
33 r io.Reader
34 w io.Writer
35 f ioFlusher
36
37 dec *Decoder
38 enc *Encoder
39 // bw *bufio.Writer
40 // br *bufio.Reader
41 mu sync.Mutex
42 h Handle
43
44 cls bool
45 clsmu sync.RWMutex
46 clsErr error
47}
48
49func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
50 // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
51 return newRPCCodec2(conn, conn, conn, h)
52}
53
54func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
55 // defensive: ensure that jsonH has TermWhitespace turned on.
56 if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
57 panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
58 }
59 // always ensure that we use a flusher, and always flush what was written to the connection.
60 // we lose nothing by using a buffered writer internally.
61 f, ok := w.(ioFlusher)
62 bh := h.getBasicHandle()
63 if !bh.RPCNoBuffer {
64 if bh.WriterBufferSize <= 0 {
65 if !ok {
66 bw := bufio.NewWriter(w)
67 f, w = bw, bw
68 }
69 }
70 if bh.ReaderBufferSize <= 0 {
71 if _, ok = w.(ioPeeker); !ok {
72 if _, ok = w.(ioBuffered); !ok {
73 br := bufio.NewReader(r)
74 r = br
75 }
76 }
77 }
78 }
79 return rpcCodec{
80 c: c,
81 w: w,
82 r: r,
83 f: f,
84 h: h,
85 enc: NewEncoder(w, h),
86 dec: NewDecoder(r, h),
87 }
88}
89
90func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
91 if c.isClosed() {
92 return c.clsErr
93 }
94 err = c.enc.Encode(obj1)
95 if err == nil {
96 if writeObj2 {
97 err = c.enc.Encode(obj2)
98 }
99 // if err == nil && c.f != nil {
100 // err = c.f.Flush()
101 // }
102 }
103 if c.f != nil {
104 if err == nil {
105 err = c.f.Flush()
106 } else {
107 _ = c.f.Flush() // swallow flush error, so we maintain prior error on write
108 }
109 }
110 return
111}
112
113func (c *rpcCodec) swallow(err *error) {
114 defer panicToErr(c.dec, err)
115 c.dec.swallow()
116}
117
118func (c *rpcCodec) read(obj interface{}) (err error) {
119 if c.isClosed() {
120 return c.clsErr
121 }
122 //If nil is passed in, we should read and discard
123 if obj == nil {
124 // var obj2 interface{}
125 // return c.dec.Decode(&obj2)
126 c.swallow(&err)
127 return
128 }
129 return c.dec.Decode(obj)
130}
131
132func (c *rpcCodec) isClosed() (b bool) {
133 if c.c != nil {
134 c.clsmu.RLock()
135 b = c.cls
136 c.clsmu.RUnlock()
137 }
138 return
139}
140
141func (c *rpcCodec) Close() error {
142 if c.c == nil || c.isClosed() {
143 return c.clsErr
144 }
145 c.clsmu.Lock()
146 c.cls = true
147 c.clsErr = c.c.Close()
148 c.clsmu.Unlock()
149 return c.clsErr
150}
151
152func (c *rpcCodec) ReadResponseBody(body interface{}) error {
153 return c.read(body)
154}
155
156// -------------------------------------
157
158type goRpcCodec struct {
159 rpcCodec
160}
161
162func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
163 // Must protect for concurrent access as per API
164 c.mu.Lock()
165 defer c.mu.Unlock()
166 return c.write(r, body, true)
167}
168
169func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
170 c.mu.Lock()
171 defer c.mu.Unlock()
172 return c.write(r, body, true)
173}
174
175func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
176 return c.read(r)
177}
178
179func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
180 return c.read(r)
181}
182
183func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
184 return c.read(body)
185}
186
187// -------------------------------------
188
189// goRpc is the implementation of Rpc that uses the communication protocol
190// as defined in net/rpc package.
191type goRpc struct{}
192
193// GoRpc implements Rpc using the communication protocol defined in net/rpc package.
194//
195// Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
196//
197// For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
198// This ensures we use an adequate buffer during reading and writing.
199// If not configured, we will internally initialize and use a buffer during reads and writes.
200// This can be turned off via the RPCNoBuffer option on the Handle.
201// var handle codec.JsonHandle
202// handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
203//
204// Example 1: one way of configuring buffering explicitly:
205// var handle codec.JsonHandle // codec handle
206// handle.ReaderBufferSize = 1024
207// handle.WriterBufferSize = 1024
208// var conn io.ReadWriteCloser // connection got from a socket
209// var serverCodec = GoRpc.ServerCodec(conn, handle)
210// var clientCodec = GoRpc.ClientCodec(conn, handle)
211//
212// Example 2: you can also explicitly create a buffered connection yourself,
213// and not worry about configuring the buffer sizes in the Handle.
214// var handle codec.Handle // codec handle
215// var conn io.ReadWriteCloser // connection got from a socket
216// var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
217// io.Closer
218// *bufio.Reader
219// *bufio.Writer
220// }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
221// var serverCodec = GoRpc.ServerCodec(bufconn, handle)
222// var clientCodec = GoRpc.ClientCodec(bufconn, handle)
223//
224var GoRpc goRpc
225
226func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
227 return &goRpcCodec{newRPCCodec(conn, h)}
228}
229
230func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
231 return &goRpcCodec{newRPCCodec(conn, h)}
232}