blob: 82e34119311f881134214f2dee86e967f2ca032c [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package adapter
16
17import (
18 "context"
19
20 "google.golang.org/grpc"
21 "google.golang.org/grpc/metadata"
22)
23
24// chanServerStream implements grpc.ServerStream with a chanStream
25type chanServerStream struct {
26 headerc chan<- metadata.MD
27 trailerc chan<- metadata.MD
28 grpc.Stream
29
30 headers []metadata.MD
31}
32
33func (ss *chanServerStream) SendHeader(md metadata.MD) error {
34 if ss.headerc == nil {
35 return errAlreadySentHeader
36 }
37 outmd := make(map[string][]string)
38 for _, h := range append(ss.headers, md) {
39 for k, v := range h {
40 outmd[k] = v
41 }
42 }
43 select {
44 case ss.headerc <- outmd:
45 ss.headerc = nil
46 ss.headers = nil
47 return nil
48 case <-ss.Context().Done():
49 }
50 return ss.Context().Err()
51}
52
53func (ss *chanServerStream) SetHeader(md metadata.MD) error {
54 if ss.headerc == nil {
55 return errAlreadySentHeader
56 }
57 ss.headers = append(ss.headers, md)
58 return nil
59}
60
61func (ss *chanServerStream) SetTrailer(md metadata.MD) {
62 ss.trailerc <- md
63}
64
65// chanClientStream implements grpc.ClientStream with a chanStream
66type chanClientStream struct {
67 headerc <-chan metadata.MD
68 trailerc <-chan metadata.MD
69 *chanStream
70}
71
72func (cs *chanClientStream) Header() (metadata.MD, error) {
73 select {
74 case md := <-cs.headerc:
75 return md, nil
76 case <-cs.Context().Done():
77 }
78 return nil, cs.Context().Err()
79}
80
81func (cs *chanClientStream) Trailer() metadata.MD {
82 select {
83 case md := <-cs.trailerc:
84 return md
85 case <-cs.Context().Done():
86 return nil
87 }
88}
89
90func (cs *chanClientStream) CloseSend() error {
91 close(cs.chanStream.sendc)
92 return nil
93}
94
95// chanStream implements grpc.Stream using channels
96type chanStream struct {
97 recvc <-chan interface{}
98 sendc chan<- interface{}
99 ctx context.Context
100 cancel context.CancelFunc
101}
102
103func (s *chanStream) Context() context.Context { return s.ctx }
104
105func (s *chanStream) SendMsg(m interface{}) error {
106 select {
107 case s.sendc <- m:
108 if err, ok := m.(error); ok {
109 return err
110 }
111 return nil
112 case <-s.ctx.Done():
113 }
114 return s.ctx.Err()
115}
116
117func (s *chanStream) RecvMsg(m interface{}) error {
118 v := m.(*interface{})
119 for {
120 select {
121 case msg, ok := <-s.recvc:
122 if !ok {
123 return grpc.ErrClientConnClosing
124 }
125 if err, ok := msg.(error); ok {
126 return err
127 }
128 *v = msg
129 return nil
130 case <-s.ctx.Done():
131 }
132 if len(s.recvc) == 0 {
133 // prioritize any pending recv messages over canceled context
134 break
135 }
136 }
137 return s.ctx.Err()
138}
139
140func newPipeStream(ctx context.Context, ssHandler func(chanServerStream) error) chanClientStream {
141 // ch1 is buffered so server can send error on close
142 ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
143 headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
144
145 cctx, ccancel := context.WithCancel(ctx)
146 cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
147 cs := chanClientStream{headerc, trailerc, cli}
148
149 sctx, scancel := context.WithCancel(ctx)
150 srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
151 ss := chanServerStream{headerc, trailerc, srv, nil}
152
153 go func() {
154 if err := ssHandler(ss); err != nil {
155 select {
156 case srv.sendc <- err:
157 case <-sctx.Done():
158 case <-cctx.Done():
159 }
160 }
161 scancel()
162 ccancel()
163 }()
164 return cs
165}