blob: c44979094c6d802218ae76532a6cee585384eeee [file] [log] [blame]
Elia Battistonc8d0d462022-02-22 16:30:51 +01001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
23 "context"
24 "sync"
25)
26
27type TSerializer struct {
28 Transport *TMemoryBuffer
29 Protocol TProtocol
30}
31
32type TStruct interface {
33 Write(ctx context.Context, p TProtocol) error
34 Read(ctx context.Context, p TProtocol) error
35}
36
37func NewTSerializer() *TSerializer {
38 transport := NewTMemoryBufferLen(1024)
39 protocol := NewTBinaryProtocolTransport(transport)
40
41 return &TSerializer{
42 Transport: transport,
43 Protocol: protocol,
44 }
45}
46
47func (t *TSerializer) WriteString(ctx context.Context, msg TStruct) (s string, err error) {
48 t.Transport.Reset()
49
50 if err = msg.Write(ctx, t.Protocol); err != nil {
51 return
52 }
53
54 if err = t.Protocol.Flush(ctx); err != nil {
55 return
56 }
57 if err = t.Transport.Flush(ctx); err != nil {
58 return
59 }
60
61 return t.Transport.String(), nil
62}
63
64func (t *TSerializer) Write(ctx context.Context, msg TStruct) (b []byte, err error) {
65 t.Transport.Reset()
66
67 if err = msg.Write(ctx, t.Protocol); err != nil {
68 return
69 }
70
71 if err = t.Protocol.Flush(ctx); err != nil {
72 return
73 }
74
75 if err = t.Transport.Flush(ctx); err != nil {
76 return
77 }
78
79 b = append(b, t.Transport.Bytes()...)
80 return
81}
82
83// TSerializerPool is the thread-safe version of TSerializer, it uses resource
84// pool of TSerializer under the hood.
85//
86// It must be initialized with either NewTSerializerPool or
87// NewTSerializerPoolSizeFactory.
88type TSerializerPool struct {
89 pool sync.Pool
90}
91
92// NewTSerializerPool creates a new TSerializerPool.
93//
94// NewTSerializer can be used as the arg here.
95func NewTSerializerPool(f func() *TSerializer) *TSerializerPool {
96 return &TSerializerPool{
97 pool: sync.Pool{
98 New: func() interface{} {
99 return f()
100 },
101 },
102 }
103}
104
105// NewTSerializerPoolSizeFactory creates a new TSerializerPool with the given
106// size and protocol factory.
107//
108// Note that the size is not the limit. The TMemoryBuffer underneath can grow
109// larger than that. It just dictates the initial size.
110func NewTSerializerPoolSizeFactory(size int, factory TProtocolFactory) *TSerializerPool {
111 return &TSerializerPool{
112 pool: sync.Pool{
113 New: func() interface{} {
114 transport := NewTMemoryBufferLen(size)
115 protocol := factory.GetProtocol(transport)
116
117 return &TSerializer{
118 Transport: transport,
119 Protocol: protocol,
120 }
121 },
122 },
123 }
124}
125
126func (t *TSerializerPool) WriteString(ctx context.Context, msg TStruct) (string, error) {
127 s := t.pool.Get().(*TSerializer)
128 defer t.pool.Put(s)
129 return s.WriteString(ctx, msg)
130}
131
132func (t *TSerializerPool) Write(ctx context.Context, msg TStruct) ([]byte, error) {
133 s := t.pool.Get().(*TSerializer)
134 defer t.pool.Put(s)
135 return s.Write(ctx, msg)
136}