blob: bb7faeadedea445e91b60a5dd4ca685892bb3fdf [file] [log] [blame]
Kent Hagerman1e9061e2019-05-21 16:01:21 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "context"
Scott Bakerc69d4dc2019-08-09 12:25:46 -070021 "encoding/hex"
Kent Hagerman1e9061e2019-05-21 16:01:21 -040022 "errors"
23 "github.com/opencord/voltha-go/common/log"
24 "google.golang.org/grpc"
25 "google.golang.org/grpc/metadata"
26 "sort"
27 "sync"
28)
29
30type streams struct {
31 mutex sync.Mutex
32 activeStream *stream
33 streams map[string]*stream
34 sortedStreams []*stream
35}
36
37type stream struct {
38 stream grpc.ClientStream
39 ctx context.Context
40 cancel context.CancelFunc
41 ok2Close chan struct{}
42 c2sReturn chan error
43 s2cReturn error
44}
45
46func (s *streams) clientCancel() {
47 for _, strm := range s.streams {
48 if strm != nil {
49 strm.cancel()
50 }
51 }
52}
53
54func (s *streams) closeSend() {
55 for _, strm := range s.streams {
56 if strm != nil {
57 <-strm.ok2Close
58 log.Debug("Closing southbound stream")
59 strm.stream.CloseSend()
60 }
61 }
62}
63
64func (s *streams) trailer() metadata.MD {
65 return s.activeStream.stream.Trailer()
66}
67
68func (s *streams) getActive() *stream {
69 s.mutex.Lock()
70 defer s.mutex.Unlock()
71 return s.activeStream
72}
73
74func (s *streams) setThenGetActive(strm *stream) *stream {
75 s.mutex.Lock()
76 defer s.mutex.Unlock()
77 if s.activeStream == nil {
78 s.activeStream = strm
79 }
80 return s.activeStream
81}
82
83func (s *streams) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
84 fc2s := func(srcS *stream) {
85 for i := 0; ; i++ {
86 if err := srcS.stream.RecvMsg(f); err != nil {
87 if s.setThenGetActive(srcS) == srcS {
88 srcS.c2sReturn <- err // this can be io.EOF which is the success case
89 } else {
90 srcS.c2sReturn <- nil // Inactive responder
91 }
92 close(srcS.ok2Close)
93 break
94 }
95 if s.setThenGetActive(srcS) != srcS {
96 srcS.c2sReturn <- nil
97 continue
98 }
99 if i == 0 {
100 // This is a bit of a hack, but client to server headers are only readable after first client msg is
101 // received but must be written to server stream before the first msg is flushed.
102 // This is the only place to do it nicely.
103 md, err := srcS.stream.Header()
104 if err != nil {
105 srcS.c2sReturn <- err
106 break
107 }
108 // Update the metadata for the response.
109 if f.metaKey != NoMeta {
110 if f.metaVal == "" {
111 // We could also alsways just do this
112 md.Set(f.metaKey, f.backend.name)
113 } else {
114 md.Set(f.metaKey, f.metaVal)
115 }
116 }
117 if err := dst.SendHeader(md); err != nil {
118 srcS.c2sReturn <- err
119 break
120 }
121 }
Scott Bakerc69d4dc2019-08-09 12:25:46 -0700122 log.Debugf("Northbound frame %s", hex.EncodeToString(f.payload))
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400123 if err := dst.SendMsg(f); err != nil {
124 srcS.c2sReturn <- err
125 break
126 }
127 }
128 }
129
130 // There should be AT LEAST one open stream at this point
131 // if there isn't its a grave error in the code and it will
132 // cause this thread to block here so check for it and
133 // don't let the lock up happen but report the error
134 ret := make(chan error, 1)
135 agg := make(chan *stream)
136 atLeastOne := false
137 for _, strm := range s.streams {
138 if strm != nil {
139 go fc2s(strm)
140 go func(s *stream) { // Wait on result and aggregate
141 r := <-s.c2sReturn // got the return code
142 if r == nil {
143 return // We're the redundant stream, just die
144 }
145 s.c2sReturn <- r // put it back to pass it along
146 agg <- s // send the stream to the aggregator
147 }(strm)
148 atLeastOne = true
149 }
150 }
151 if atLeastOne == true {
152 go func() { // Wait on aggregated result
153 s := <-agg
154 ret <- <-s.c2sReturn
155 }()
156 } else {
157 err := errors.New("There are no open streams. Unable to forward message.")
158 log.Error(err)
159 ret <- err
160 }
161 return ret
162}
163
164func (s *streams) sendAll(f *nbFrame) error {
165 var rtrn error
166
167 atLeastOne := false
168 for _, strm := range s.sortedStreams {
169 if strm != nil {
170 if err := strm.stream.SendMsg(f); err != nil {
171 log.Debugf("Error on SendMsg: %s", err.Error())
172 strm.s2cReturn = err
173 }
174 atLeastOne = true
175 } else {
176 log.Debugf("Nil stream")
177 }
178 }
179 // If one of the streams succeeded, declare success
180 // if none did pick an error and return it.
181 if atLeastOne == true {
182 for _, strm := range s.sortedStreams {
183 if strm != nil {
184 rtrn = strm.s2cReturn
185 if rtrn == nil {
186 return rtrn
187 }
188 }
189 }
190 return rtrn
191 } else {
192 rtrn = errors.New("There are no open streams, this should never happen")
193 log.Error(rtrn)
194 }
195 return rtrn
196}
197
198func (s *streams) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
199 ret := make(chan error, 1)
200 go func() {
201 // The frame buffer already has the results of a first
202 // RecvMsg in it so the first thing to do is to
203 // send it to the list of client streams and only
204 // then read some more.
205 for i := 0; ; i++ {
206 // Send the message to each of the backend streams
207 if err := s.sendAll(f); err != nil {
208 ret <- err
209 log.Debugf("SendAll failed %s", err.Error())
210 break
211 }
Scott Bakerc69d4dc2019-08-09 12:25:46 -0700212 log.Debugf("Southbound frame %s", hex.EncodeToString(f.payload))
Kent Hagerman1e9061e2019-05-21 16:01:21 -0400213 if err := src.RecvMsg(f); err != nil {
214 ret <- err // this can be io.EOF which is happy case
215 break
216 }
217 }
218 }()
219 return ret
220}
221
222func (s *streams) sortStreams() {
223 var tmpKeys []string
224 for k := range s.streams {
225 tmpKeys = append(tmpKeys, k)
226 }
227 sort.Strings(tmpKeys)
228 for _, v := range tmpKeys {
229 s.sortedStreams = append(s.sortedStreams, s.streams[v])
230 }
231}