blob: c44979094c6d802218ae76532a6cee585384eeee [file] [log] [blame]
mpagenkoaf801632020-07-03 10:00:42 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
khenaidoo7d3c5582021-08-11 18:09:44 -040022import (
23 "context"
24 "sync"
25)
26
mpagenkoaf801632020-07-03 10:00:42 +000027type TSerializer struct {
28 Transport *TMemoryBuffer
29 Protocol TProtocol
30}
31
32type TStruct interface {
khenaidoo7d3c5582021-08-11 18:09:44 -040033 Write(ctx context.Context, p TProtocol) error
34 Read(ctx context.Context, p TProtocol) error
mpagenkoaf801632020-07-03 10:00:42 +000035}
36
37func NewTSerializer() *TSerializer {
38 transport := NewTMemoryBufferLen(1024)
khenaidoo7d3c5582021-08-11 18:09:44 -040039 protocol := NewTBinaryProtocolTransport(transport)
mpagenkoaf801632020-07-03 10:00:42 +000040
41 return &TSerializer{
khenaidoo7d3c5582021-08-11 18:09:44 -040042 Transport: transport,
43 Protocol: protocol,
44 }
mpagenkoaf801632020-07-03 10:00:42 +000045}
46
khenaidoo7d3c5582021-08-11 18:09:44 -040047func (t *TSerializer) WriteString(ctx context.Context, msg TStruct) (s string, err error) {
mpagenkoaf801632020-07-03 10:00:42 +000048 t.Transport.Reset()
49
khenaidoo7d3c5582021-08-11 18:09:44 -040050 if err = msg.Write(ctx, t.Protocol); err != nil {
mpagenkoaf801632020-07-03 10:00:42 +000051 return
52 }
53
khenaidoo7d3c5582021-08-11 18:09:44 -040054 if err = t.Protocol.Flush(ctx); err != nil {
mpagenkoaf801632020-07-03 10:00:42 +000055 return
56 }
khenaidoo7d3c5582021-08-11 18:09:44 -040057 if err = t.Transport.Flush(ctx); err != nil {
mpagenkoaf801632020-07-03 10:00:42 +000058 return
59 }
60
61 return t.Transport.String(), nil
62}
63
khenaidoo7d3c5582021-08-11 18:09:44 -040064func (t *TSerializer) Write(ctx context.Context, msg TStruct) (b []byte, err error) {
mpagenkoaf801632020-07-03 10:00:42 +000065 t.Transport.Reset()
66
khenaidoo7d3c5582021-08-11 18:09:44 -040067 if err = msg.Write(ctx, t.Protocol); err != nil {
mpagenkoaf801632020-07-03 10:00:42 +000068 return
69 }
70
khenaidoo7d3c5582021-08-11 18:09:44 -040071 if err = t.Protocol.Flush(ctx); err != nil {
mpagenkoaf801632020-07-03 10:00:42 +000072 return
73 }
74
khenaidoo7d3c5582021-08-11 18:09:44 -040075 if err = t.Transport.Flush(ctx); err != nil {
mpagenkoaf801632020-07-03 10:00:42 +000076 return
77 }
78
79 b = append(b, t.Transport.Bytes()...)
80 return
81}
khenaidoo7d3c5582021-08-11 18:09:44 -040082
83// TSerializerPool is the thread-safe version of TSerializer, it uses resource
84// pool of TSerializer under the hood.
85//
86// It must be initialized with either NewTSerializerPool or
87// NewTSerializerPoolSizeFactory.
88type TSerializerPool struct {
89 pool sync.Pool
90}
91
92// NewTSerializerPool creates a new TSerializerPool.
93//
94// NewTSerializer can be used as the arg here.
95func NewTSerializerPool(f func() *TSerializer) *TSerializerPool {
96 return &TSerializerPool{
97 pool: sync.Pool{
98 New: func() interface{} {
99 return f()
100 },
101 },
102 }
103}
104
105// NewTSerializerPoolSizeFactory creates a new TSerializerPool with the given
106// size and protocol factory.
107//
108// Note that the size is not the limit. The TMemoryBuffer underneath can grow
109// larger than that. It just dictates the initial size.
110func NewTSerializerPoolSizeFactory(size int, factory TProtocolFactory) *TSerializerPool {
111 return &TSerializerPool{
112 pool: sync.Pool{
113 New: func() interface{} {
114 transport := NewTMemoryBufferLen(size)
115 protocol := factory.GetProtocol(transport)
116
117 return &TSerializer{
118 Transport: transport,
119 Protocol: protocol,
120 }
121 },
122 },
123 }
124}
125
126func (t *TSerializerPool) WriteString(ctx context.Context, msg TStruct) (string, error) {
127 s := t.pool.Get().(*TSerializer)
128 defer t.pool.Put(s)
129 return s.WriteString(ctx, msg)
130}
131
132func (t *TSerializerPool) Write(ctx context.Context, msg TStruct) ([]byte, error) {
133 s := t.pool.Get().(*TSerializer)
134 defer t.pool.Put(s)
135 return s.Write(ctx, msg)
136}