blob: f5736df42762f068996e043e86f5959795d422d2 [file] [log] [blame]
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
23 "bufio"
24 "bytes"
25 "compress/zlib"
26 "context"
27 "encoding/binary"
28 "errors"
29 "fmt"
30 "io"
31 "io/ioutil"
32)
33
34// Size in bytes for 32-bit ints.
35const size32 = 4
36
37type headerMeta struct {
38 MagicFlags uint32
39 SequenceID int32
40 HeaderLength uint16
41}
42
43const headerMetaSize = 10
44
45type clientType int
46
47const (
48 clientUnknown clientType = iota
49 clientHeaders
50 clientFramedBinary
51 clientUnframedBinary
52 clientFramedCompact
53 clientUnframedCompact
54)
55
56// Constants defined in THeader format:
57// https://github.com/apache/thrift/blob/master/doc/specs/HeaderFormat.md
58const (
59 THeaderHeaderMagic uint32 = 0x0fff0000
60 THeaderHeaderMask uint32 = 0xffff0000
61 THeaderFlagsMask uint32 = 0x0000ffff
62 THeaderMaxFrameSize uint32 = 0x3fffffff
63)
64
65// THeaderMap is the type of the header map in THeader transport.
66type THeaderMap map[string]string
67
68// THeaderProtocolID is the wrapped protocol id used in THeader.
69type THeaderProtocolID int32
70
71// Supported THeaderProtocolID values.
72const (
73 THeaderProtocolBinary THeaderProtocolID = 0x00
74 THeaderProtocolCompact THeaderProtocolID = 0x02
75 THeaderProtocolDefault = THeaderProtocolBinary
76)
77
78// Declared globally to avoid repetitive allocations, not really used.
79var globalMemoryBuffer = NewTMemoryBuffer()
80
81// Validate checks whether the THeaderProtocolID is a valid/supported one.
82func (id THeaderProtocolID) Validate() error {
83 _, err := id.GetProtocol(globalMemoryBuffer)
84 return err
85}
86
87// GetProtocol gets the corresponding TProtocol from the wrapped protocol id.
88func (id THeaderProtocolID) GetProtocol(trans TTransport) (TProtocol, error) {
89 switch id {
90 default:
91 return nil, NewTApplicationException(
92 INVALID_PROTOCOL,
93 fmt.Sprintf("THeader protocol id %d not supported", id),
94 )
95 case THeaderProtocolBinary:
96 return NewTBinaryProtocolTransport(trans), nil
97 case THeaderProtocolCompact:
98 return NewTCompactProtocol(trans), nil
99 }
100}
101
102// THeaderTransformID defines the numeric id of the transform used.
103type THeaderTransformID int32
104
105// THeaderTransformID values.
106//
107// Values not defined here are not currently supported, namely HMAC and Snappy.
108const (
109 TransformNone THeaderTransformID = iota // 0, no special handling
110 TransformZlib // 1, zlib
111)
112
113var supportedTransformIDs = map[THeaderTransformID]bool{
114 TransformNone: true,
115 TransformZlib: true,
116}
117
118// TransformReader is an io.ReadCloser that handles transforms reading.
119type TransformReader struct {
120 io.Reader
121
122 closers []io.Closer
123}
124
125var _ io.ReadCloser = (*TransformReader)(nil)
126
127// NewTransformReaderWithCapacity initializes a TransformReader with expected
128// closers capacity.
129//
130// If you don't know the closers capacity beforehand, just use
131//
132// &TransformReader{Reader: baseReader}
133//
134// instead would be sufficient.
135func NewTransformReaderWithCapacity(baseReader io.Reader, capacity int) *TransformReader {
136 return &TransformReader{
137 Reader: baseReader,
138 closers: make([]io.Closer, 0, capacity),
139 }
140}
141
142// Close calls the underlying closers in appropriate order,
143// stops at and returns the first error encountered.
144func (tr *TransformReader) Close() error {
145 // Call closers in reversed order
146 for i := len(tr.closers) - 1; i >= 0; i-- {
147 if err := tr.closers[i].Close(); err != nil {
148 return err
149 }
150 }
151 return nil
152}
153
154// AddTransform adds a transform.
155func (tr *TransformReader) AddTransform(id THeaderTransformID) error {
156 switch id {
157 default:
158 return NewTApplicationException(
159 INVALID_TRANSFORM,
160 fmt.Sprintf("THeaderTransformID %d not supported", id),
161 )
162 case TransformNone:
163 // no-op
164 case TransformZlib:
165 readCloser, err := zlib.NewReader(tr.Reader)
166 if err != nil {
167 return err
168 }
169 tr.Reader = readCloser
170 tr.closers = append(tr.closers, readCloser)
171 }
172 return nil
173}
174
175// TransformWriter is an io.WriteCloser that handles transforms writing.
176type TransformWriter struct {
177 io.Writer
178
179 closers []io.Closer
180}
181
182var _ io.WriteCloser = (*TransformWriter)(nil)
183
184// NewTransformWriter creates a new TransformWriter with base writer and transforms.
185func NewTransformWriter(baseWriter io.Writer, transforms []THeaderTransformID) (io.WriteCloser, error) {
186 writer := &TransformWriter{
187 Writer: baseWriter,
188 closers: make([]io.Closer, 0, len(transforms)),
189 }
190 for _, id := range transforms {
191 if err := writer.AddTransform(id); err != nil {
192 return nil, err
193 }
194 }
195 return writer, nil
196}
197
198// Close calls the underlying closers in appropriate order,
199// stops at and returns the first error encountered.
200func (tw *TransformWriter) Close() error {
201 // Call closers in reversed order
202 for i := len(tw.closers) - 1; i >= 0; i-- {
203 if err := tw.closers[i].Close(); err != nil {
204 return err
205 }
206 }
207 return nil
208}
209
210// AddTransform adds a transform.
211func (tw *TransformWriter) AddTransform(id THeaderTransformID) error {
212 switch id {
213 default:
214 return NewTApplicationException(
215 INVALID_TRANSFORM,
216 fmt.Sprintf("THeaderTransformID %d not supported", id),
217 )
218 case TransformNone:
219 // no-op
220 case TransformZlib:
221 writeCloser := zlib.NewWriter(tw.Writer)
222 tw.Writer = writeCloser
223 tw.closers = append(tw.closers, writeCloser)
224 }
225 return nil
226}
227
228// THeaderInfoType is the type id of the info headers.
229type THeaderInfoType int32
230
231// Supported THeaderInfoType values.
232const (
233 _ THeaderInfoType = iota // Skip 0
234 InfoKeyValue // 1
235 // Rest of the info types are not supported.
236)
237
238// THeaderTransport is a Transport mode that implements THeader.
239//
240// Note that THeaderTransport handles frame and zlib by itself,
241// so the underlying transport should be a raw socket transports (TSocket or TSSLSocket),
242// instead of rich transports like TZlibTransport or TFramedTransport.
243type THeaderTransport struct {
244 SequenceID int32
245 Flags uint32
246
247 transport TTransport
248
249 // THeaderMap for read and write
250 readHeaders THeaderMap
251 writeHeaders THeaderMap
252
253 // Reading related variables.
254 reader *bufio.Reader
255 // When frame is detected, we read the frame fully into frameBuffer.
256 frameBuffer bytes.Buffer
257 // When it's non-nil, Read should read from frameReader instead of
258 // reader, and EOF error indicates end of frame instead of end of all
259 // transport.
260 frameReader io.ReadCloser
261
262 // Writing related variables
263 writeBuffer bytes.Buffer
264 writeTransforms []THeaderTransformID
265
266 clientType clientType
267 protocolID THeaderProtocolID
268 cfg *TConfiguration
269
270 // buffer is used in the following scenarios to avoid repetitive
271 // allocations, while 4 is big enough for all those scenarios:
272 //
273 // * header padding (max size 4)
274 // * write the frame size (size 4)
275 buffer [4]byte
276}
277
278var _ TTransport = (*THeaderTransport)(nil)
279
280// Deprecated: Use NewTHeaderTransportConf instead.
281func NewTHeaderTransport(trans TTransport) *THeaderTransport {
282 return NewTHeaderTransportConf(trans, &TConfiguration{
283 noPropagation: true,
284 })
285}
286
287// NewTHeaderTransportConf creates THeaderTransport from the
288// underlying transport, with given TConfiguration attached.
289//
290// If trans is already a *THeaderTransport, it will be returned as is,
291// but with TConfiguration overridden by the value passed in.
292//
293// The protocol ID in TConfiguration is only useful for client transports.
294// For servers,
295// the protocol ID will be overridden again to the one set by the client,
296// to ensure that servers always speak the same dialect as the client.
297func NewTHeaderTransportConf(trans TTransport, conf *TConfiguration) *THeaderTransport {
298 if ht, ok := trans.(*THeaderTransport); ok {
299 ht.SetTConfiguration(conf)
300 return ht
301 }
302 PropagateTConfiguration(trans, conf)
303 return &THeaderTransport{
304 transport: trans,
305 reader: bufio.NewReader(trans),
306 writeHeaders: make(THeaderMap),
307 protocolID: conf.GetTHeaderProtocolID(),
308 cfg: conf,
309 }
310}
311
312// Open calls the underlying transport's Open function.
313func (t *THeaderTransport) Open() error {
314 return t.transport.Open()
315}
316
317// IsOpen calls the underlying transport's IsOpen function.
318func (t *THeaderTransport) IsOpen() bool {
319 return t.transport.IsOpen()
320}
321
322// ReadFrame tries to read the frame header, guess the client type, and handle
323// unframed clients.
324func (t *THeaderTransport) ReadFrame(ctx context.Context) error {
325 if !t.needReadFrame() {
326 // No need to read frame, skipping.
327 return nil
328 }
329
330 // Peek and handle the first 32 bits.
331 // They could either be the length field of a framed message,
332 // or the first bytes of an unframed message.
333 var buf []byte
334 var err error
335 // This is also usually the first read from a connection,
336 // so handle retries around socket timeouts.
337 _, deadlineSet := ctx.Deadline()
338 for {
339 buf, err = t.reader.Peek(size32)
340 if deadlineSet && isTimeoutError(err) && ctx.Err() == nil {
341 // This is I/O timeout and we still have time,
342 // continue trying
343 continue
344 }
345 // For anything else, do not retry
346 break
347 }
348 if err != nil {
349 return err
350 }
351
352 frameSize := binary.BigEndian.Uint32(buf)
353 if frameSize&VERSION_MASK == VERSION_1 {
354 t.clientType = clientUnframedBinary
355 return nil
356 }
357 if buf[0] == COMPACT_PROTOCOL_ID && buf[1]&COMPACT_VERSION_MASK == COMPACT_VERSION {
358 t.clientType = clientUnframedCompact
359 return nil
360 }
361
362 // At this point it should be a framed message,
363 // sanity check on frameSize then discard the peeked part.
364 if frameSize > THeaderMaxFrameSize || frameSize > uint32(t.cfg.GetMaxFrameSize()) {
365 return NewTProtocolExceptionWithType(
366 SIZE_LIMIT,
367 errors.New("frame too large"),
368 )
369 }
370 t.reader.Discard(size32)
371
372 // Read the frame fully into frameBuffer.
373 _, err = io.CopyN(&t.frameBuffer, t.reader, int64(frameSize))
374 if err != nil {
375 return err
376 }
377 t.frameReader = ioutil.NopCloser(&t.frameBuffer)
378
379 // Peek and handle the next 32 bits.
380 buf = t.frameBuffer.Bytes()[:size32]
381 version := binary.BigEndian.Uint32(buf)
382 if version&THeaderHeaderMask == THeaderHeaderMagic {
383 t.clientType = clientHeaders
384 return t.parseHeaders(ctx, frameSize)
385 }
386 if version&VERSION_MASK == VERSION_1 {
387 t.clientType = clientFramedBinary
388 return nil
389 }
390 if buf[0] == COMPACT_PROTOCOL_ID && buf[1]&COMPACT_VERSION_MASK == COMPACT_VERSION {
391 t.clientType = clientFramedCompact
392 return nil
393 }
394 if err := t.endOfFrame(); err != nil {
395 return err
396 }
397 return NewTProtocolExceptionWithType(
398 NOT_IMPLEMENTED,
399 errors.New("unsupported client transport type"),
400 )
401}
402
403// endOfFrame does end of frame handling.
404//
405// It closes frameReader, and also resets frame related states.
406func (t *THeaderTransport) endOfFrame() error {
407 defer func() {
408 t.frameBuffer.Reset()
409 t.frameReader = nil
410 }()
411 return t.frameReader.Close()
412}
413
414func (t *THeaderTransport) parseHeaders(ctx context.Context, frameSize uint32) error {
415 if t.clientType != clientHeaders {
416 return nil
417 }
418
419 var err error
420 var meta headerMeta
421 if err = binary.Read(&t.frameBuffer, binary.BigEndian, &meta); err != nil {
422 return err
423 }
424 frameSize -= headerMetaSize
425 t.Flags = meta.MagicFlags & THeaderFlagsMask
426 t.SequenceID = meta.SequenceID
427 headerLength := int64(meta.HeaderLength) * 4
428 if int64(frameSize) < headerLength {
429 return NewTProtocolExceptionWithType(
430 SIZE_LIMIT,
431 errors.New("header size is larger than the whole frame"),
432 )
433 }
434 headerBuf := NewTMemoryBuffer()
435 _, err = io.CopyN(headerBuf, &t.frameBuffer, headerLength)
436 if err != nil {
437 return err
438 }
439 hp := NewTCompactProtocol(headerBuf)
440 hp.SetTConfiguration(t.cfg)
441
442 // At this point the header is already read into headerBuf,
443 // and t.frameBuffer starts from the actual payload.
444 protoID, err := hp.readVarint32()
445 if err != nil {
446 return err
447 }
448 t.protocolID = THeaderProtocolID(protoID)
449
450 var transformCount int32
451 transformCount, err = hp.readVarint32()
452 if err != nil {
453 return err
454 }
455 if transformCount > 0 {
456 reader := NewTransformReaderWithCapacity(
457 &t.frameBuffer,
458 int(transformCount),
459 )
460 t.frameReader = reader
461 transformIDs := make([]THeaderTransformID, transformCount)
462 for i := 0; i < int(transformCount); i++ {
463 id, err := hp.readVarint32()
464 if err != nil {
465 return err
466 }
467 transformIDs[i] = THeaderTransformID(id)
468 }
469 // The transform IDs on the wire was added based on the order of
470 // writing, so on the reading side we need to reverse the order.
471 for i := transformCount - 1; i >= 0; i-- {
472 id := transformIDs[i]
473 if err := reader.AddTransform(id); err != nil {
474 return err
475 }
476 }
477 }
478
479 // The info part does not use the transforms yet, so it's
480 // important to continue using headerBuf.
481 headers := make(THeaderMap)
482 for {
483 infoType, err := hp.readVarint32()
484 if errors.Is(err, io.EOF) {
485 break
486 }
487 if err != nil {
488 return err
489 }
490 if THeaderInfoType(infoType) == InfoKeyValue {
491 count, err := hp.readVarint32()
492 if err != nil {
493 return err
494 }
495 for i := 0; i < int(count); i++ {
496 key, err := hp.ReadString(ctx)
497 if err != nil {
498 return err
499 }
500 value, err := hp.ReadString(ctx)
501 if err != nil {
502 return err
503 }
504 headers[key] = value
505 }
506 } else {
507 // Skip reading info section on the first
508 // unsupported info type.
509 break
510 }
511 }
512 t.readHeaders = headers
513
514 return nil
515}
516
517func (t *THeaderTransport) needReadFrame() bool {
518 if t.clientType == clientUnknown {
519 // This is a new connection that's never read before.
520 return true
521 }
522 if t.isFramed() && t.frameReader == nil {
523 // We just finished the last frame.
524 return true
525 }
526 return false
527}
528
529func (t *THeaderTransport) Read(p []byte) (read int, err error) {
530 // Here using context.Background instead of a context passed in is safe.
531 // First is that there's no way to pass context into this function.
532 // Then, 99% of the case when calling this Read frame is already read
533 // into frameReader. ReadFrame here is more of preventing bugs that
534 // didn't call ReadFrame before calling Read.
535 err = t.ReadFrame(context.Background())
536 if err != nil {
537 return
538 }
539 if t.frameReader != nil {
540 read, err = t.frameReader.Read(p)
541 if err == nil && t.frameBuffer.Len() <= 0 {
542 // the last Read finished the frame, do endOfFrame
543 // handling here.
544 err = t.endOfFrame()
545 } else if err == io.EOF {
546 err = t.endOfFrame()
547 if err != nil {
548 return
549 }
550 if read == 0 {
551 // Try to read the next frame when we hit EOF
552 // (end of frame) immediately.
553 // When we got here, it means the last read
554 // finished the previous frame, but didn't
555 // do endOfFrame handling yet.
556 // We have to read the next frame here,
557 // as otherwise we would return 0 and nil,
558 // which is a case not handled well by most
559 // protocol implementations.
560 return t.Read(p)
561 }
562 }
563 return
564 }
565 return t.reader.Read(p)
566}
567
568// Write writes data to the write buffer.
569//
570// You need to call Flush to actually write them to the transport.
571func (t *THeaderTransport) Write(p []byte) (int, error) {
572 return t.writeBuffer.Write(p)
573}
574
575// Flush writes the appropriate header and the write buffer to the underlying transport.
576func (t *THeaderTransport) Flush(ctx context.Context) error {
577 if t.writeBuffer.Len() == 0 {
578 return nil
579 }
580
581 defer t.writeBuffer.Reset()
582
583 switch t.clientType {
584 default:
585 fallthrough
586 case clientUnknown:
587 t.clientType = clientHeaders
588 fallthrough
589 case clientHeaders:
590 headers := NewTMemoryBuffer()
591 hp := NewTCompactProtocol(headers)
592 hp.SetTConfiguration(t.cfg)
593 if _, err := hp.writeVarint32(int32(t.protocolID)); err != nil {
594 return NewTTransportExceptionFromError(err)
595 }
596 if _, err := hp.writeVarint32(int32(len(t.writeTransforms))); err != nil {
597 return NewTTransportExceptionFromError(err)
598 }
599 for _, transform := range t.writeTransforms {
600 if _, err := hp.writeVarint32(int32(transform)); err != nil {
601 return NewTTransportExceptionFromError(err)
602 }
603 }
604 if len(t.writeHeaders) > 0 {
605 if _, err := hp.writeVarint32(int32(InfoKeyValue)); err != nil {
606 return NewTTransportExceptionFromError(err)
607 }
608 if _, err := hp.writeVarint32(int32(len(t.writeHeaders))); err != nil {
609 return NewTTransportExceptionFromError(err)
610 }
611 for key, value := range t.writeHeaders {
612 if err := hp.WriteString(ctx, key); err != nil {
613 return NewTTransportExceptionFromError(err)
614 }
615 if err := hp.WriteString(ctx, value); err != nil {
616 return NewTTransportExceptionFromError(err)
617 }
618 }
619 }
620 padding := 4 - headers.Len()%4
621 if padding < 4 {
622 buf := t.buffer[:padding]
623 for i := range buf {
624 buf[i] = 0
625 }
626 if _, err := headers.Write(buf); err != nil {
627 return NewTTransportExceptionFromError(err)
628 }
629 }
630
631 var payload bytes.Buffer
632 meta := headerMeta{
633 MagicFlags: THeaderHeaderMagic + t.Flags&THeaderFlagsMask,
634 SequenceID: t.SequenceID,
635 HeaderLength: uint16(headers.Len() / 4),
636 }
637 if err := binary.Write(&payload, binary.BigEndian, meta); err != nil {
638 return NewTTransportExceptionFromError(err)
639 }
640 if _, err := io.Copy(&payload, headers); err != nil {
641 return NewTTransportExceptionFromError(err)
642 }
643
644 writer, err := NewTransformWriter(&payload, t.writeTransforms)
645 if err != nil {
646 return NewTTransportExceptionFromError(err)
647 }
648 if _, err := io.Copy(writer, &t.writeBuffer); err != nil {
649 return NewTTransportExceptionFromError(err)
650 }
651 if err := writer.Close(); err != nil {
652 return NewTTransportExceptionFromError(err)
653 }
654
655 // First write frame length
656 buf := t.buffer[:size32]
657 binary.BigEndian.PutUint32(buf, uint32(payload.Len()))
658 if _, err := t.transport.Write(buf); err != nil {
659 return NewTTransportExceptionFromError(err)
660 }
661 // Then write the payload
662 if _, err := io.Copy(t.transport, &payload); err != nil {
663 return NewTTransportExceptionFromError(err)
664 }
665
666 case clientFramedBinary, clientFramedCompact:
667 buf := t.buffer[:size32]
668 binary.BigEndian.PutUint32(buf, uint32(t.writeBuffer.Len()))
669 if _, err := t.transport.Write(buf); err != nil {
670 return NewTTransportExceptionFromError(err)
671 }
672 fallthrough
673 case clientUnframedBinary, clientUnframedCompact:
674 if _, err := io.Copy(t.transport, &t.writeBuffer); err != nil {
675 return NewTTransportExceptionFromError(err)
676 }
677 }
678
679 select {
680 default:
681 case <-ctx.Done():
682 return NewTTransportExceptionFromError(ctx.Err())
683 }
684
685 return t.transport.Flush(ctx)
686}
687
688// Close closes the transport, along with its underlying transport.
689func (t *THeaderTransport) Close() error {
690 if err := t.Flush(context.Background()); err != nil {
691 return err
692 }
693 return t.transport.Close()
694}
695
696// RemainingBytes calls underlying transport's RemainingBytes.
697//
698// Even in framed cases, because of all the possible compression transforms
699// involved, the remaining frame size is likely to be different from the actual
700// remaining readable bytes, so we don't bother to keep tracking the remaining
701// frame size by ourselves and just use the underlying transport's
702// RemainingBytes directly.
703func (t *THeaderTransport) RemainingBytes() uint64 {
704 return t.transport.RemainingBytes()
705}
706
707// GetReadHeaders returns the THeaderMap read from transport.
708func (t *THeaderTransport) GetReadHeaders() THeaderMap {
709 return t.readHeaders
710}
711
712// SetWriteHeader sets a header for write.
713func (t *THeaderTransport) SetWriteHeader(key, value string) {
714 t.writeHeaders[key] = value
715}
716
717// ClearWriteHeaders clears all write headers previously set.
718func (t *THeaderTransport) ClearWriteHeaders() {
719 t.writeHeaders = make(THeaderMap)
720}
721
722// AddTransform add a transform for writing.
723func (t *THeaderTransport) AddTransform(transform THeaderTransformID) error {
724 if !supportedTransformIDs[transform] {
725 return NewTProtocolExceptionWithType(
726 NOT_IMPLEMENTED,
727 fmt.Errorf("THeaderTransformID %d not supported", transform),
728 )
729 }
730 t.writeTransforms = append(t.writeTransforms, transform)
731 return nil
732}
733
734// Protocol returns the wrapped protocol id used in this THeaderTransport.
735func (t *THeaderTransport) Protocol() THeaderProtocolID {
736 switch t.clientType {
737 default:
738 return t.protocolID
739 case clientFramedBinary, clientUnframedBinary:
740 return THeaderProtocolBinary
741 case clientFramedCompact, clientUnframedCompact:
742 return THeaderProtocolCompact
743 }
744}
745
746func (t *THeaderTransport) isFramed() bool {
747 switch t.clientType {
748 default:
749 return false
750 case clientHeaders, clientFramedBinary, clientFramedCompact:
751 return true
752 }
753}
754
755// SetTConfiguration implements TConfigurationSetter.
756func (t *THeaderTransport) SetTConfiguration(cfg *TConfiguration) {
757 PropagateTConfiguration(t.transport, cfg)
758 t.cfg = cfg
759}
760
761// THeaderTransportFactory is a TTransportFactory implementation to create
762// THeaderTransport.
763//
764// It also implements TConfigurationSetter.
765type THeaderTransportFactory struct {
766 // The underlying factory, could be nil.
767 Factory TTransportFactory
768
769 cfg *TConfiguration
770}
771
772// Deprecated: Use NewTHeaderTransportFactoryConf instead.
773func NewTHeaderTransportFactory(factory TTransportFactory) TTransportFactory {
774 return NewTHeaderTransportFactoryConf(factory, &TConfiguration{
775 noPropagation: true,
776 })
777}
778
779// NewTHeaderTransportFactoryConf creates a new *THeaderTransportFactory with
780// the given *TConfiguration.
781func NewTHeaderTransportFactoryConf(factory TTransportFactory, conf *TConfiguration) TTransportFactory {
782 return &THeaderTransportFactory{
783 Factory: factory,
784
785 cfg: conf,
786 }
787}
788
789// GetTransport implements TTransportFactory.
790func (f *THeaderTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
791 if f.Factory != nil {
792 t, err := f.Factory.GetTransport(trans)
793 if err != nil {
794 return nil, err
795 }
796 return NewTHeaderTransportConf(t, f.cfg), nil
797 }
798 return NewTHeaderTransportConf(trans, f.cfg), nil
799}
800
801// SetTConfiguration implements TConfigurationSetter.
802func (f *THeaderTransportFactory) SetTConfiguration(cfg *TConfiguration) {
803 PropagateTConfiguration(f.Factory, f.cfg)
804 f.cfg = cfg
805}
806
807var (
808 _ TConfigurationSetter = (*THeaderTransportFactory)(nil)
809 _ TConfigurationSetter = (*THeaderTransport)(nil)
810)