blob: 82269d2b84610acf9d73c37a0a23820d91a3d716 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "errors"
21 "fmt"
22 "github.com/opencord/voltha-go/common/log"
23 "google.golang.org/grpc"
24 "google.golang.org/grpc/codes"
25 "net"
26 "net/url"
27 "strconv"
28)
29
30var (
31 clientStreamDescForProxying = &grpc.StreamDesc{
32 ServerStreams: true,
33 ClientStreams: true,
34 }
35)
36
37type server struct {
38 running bool
39 name string
40 proxyListener net.Listener
41 routers map[string]map[string]Router
42 proxyServer *grpc.Server
43}
44
45func newServer(config *ServerConfig) (*server, error) {
46 var err error = nil
47 var rtrn_err = false
48 var s *server
49 // Change over to the new configuration format
50 // Validate the configuration
51 // There should be a name
52 if config.Name == "" {
53 log.Error("A server has been defined with no name")
54 rtrn_err = true
55 }
56 // Validate that there's a port specified
57 if config.Port == 0 {
58 log.Errorf("Server %s does not have a valid port assigned", config.Name)
59 rtrn_err = true
60 }
61 // Validate the ip address if one is provided
62 if _, err := url.Parse(config.Addr); err != nil {
63 log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
64 rtrn_err = true
65 }
66 if config.Type != "grpc" && config.Type != "streaming_grpc" {
67 if config.Type == "" {
68 log.Errorf("A server 'type' must be defined for server %s", config.Name)
69 } else {
70 log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
71 "but '%s' was found for server '%s'", config.Type, config.Name)
72 }
73 rtrn_err = true
74 }
75 if len(config.Routers) == 0 {
76 log.Errorf("At least one router must be specified for server '%s'", config.Name)
77 rtrn_err = true
78 }
79
80 if rtrn_err {
81 return nil, errors.New("Server configuration failed")
82 } else {
83 // The configuration is valid, create a server and configure it.
84 s = &server{name: config.Name, routers: make(map[string]map[string]Router)}
85 // The listener
86 if s.proxyListener, err =
87 net.Listen("tcp", config.Addr+":"+
88 strconv.Itoa(int(config.Port))); err != nil {
89 log.Error(err)
90 return nil, err
91 }
92 // Create the routers
93 log.Debugf("Configuring the routers for server %s", s.name)
94 for p, r := range config.routers {
95 log.Debugf("Processing router %s for package %s", r.Name, p)
96 if dr, err := newRouter(r); err != nil {
97 log.Error(err)
98 return nil, err
99 } else {
100 log.Debugf("Adding router %s to the server %s for package %s and service %s",
101 dr.Name(), s.name, p, dr.Service())
102 if _, ok := s.routers[p]; ok {
103 s.routers[p][dr.Service()] = dr
104 } else {
105 s.routers[p] = make(map[string]Router)
106 s.routers[p][dr.Service()] = dr
107 }
108 }
109 }
110 // Configure the grpc handler
111 s.proxyServer = grpc.NewServer(
112 grpc.CustomCodec(Codec()),
113 grpc.UnknownServiceHandler(s.TransparentHandler()),
114 )
115
116 }
117 return s, nil
118}
119
120func (s *server) Name() string {
121 return s.name
122}
123
124func (s *server) TransparentHandler() grpc.StreamHandler {
125 return s.handler
126}
127
128func (s *server) getRouter(pkg string, service string) (Router, bool) {
129 if fn, ok := s.routers[pkg][service]; ok { // Both specified
130 return fn, ok
131 } else if fn, ok = s.routers["*"][service]; ok { // Package wild card
132 return fn, ok
133 } else if fn, ok = s.routers[pkg]["*"]; ok { // Service wild card
134 return fn, ok
135 } else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
136 return fn, ok
137 } else {
138 return nil, false
139 }
140}
141
142func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
143 // Determine what router is intended to handle this request
144 fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
145 if !ok {
146 return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
147 }
148 log.Debugf("\n\nProcessing grpc request %s on server %s", fullMethodName, s.name)
149 methodInfo := newMethodDetails(fullMethodName)
150 r, ok := s.getRouter(methodInfo.pkg, methodInfo.service)
151 //fn, ok := s.routers[methodInfo.pkg][methodInfo.service]
152 if !ok {
153 // TODO: Should this be punted to a default transparent router??
154 // Probably not, if one is defined yes otherwise just crap out.
155
156 err := fmt.Errorf("Unable to dispatch! Service '%s' for package '%s' not found.", methodInfo.service, methodInfo.pkg)
157 log.Error(err)
158 return err
159 }
160 log.Debugf("Selected router %s", r.Name())
161
162 mk, mv, err := r.GetMetaKeyVal(serverStream)
163 if err != nil {
164 log.Error(err)
165 return err
166 }
167
168 //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,methodInfo:methodInfo,metaKey:mk,metaVal:mv)
169
170 // Extract the cluster from the selected router and use it to manage the transfer
171 if cluster, err := r.BackendCluster(methodInfo.method, mk); err != nil {
172 return err
173 } else {
174 //return beCluster.handler(nbR)
175 return cluster.handler(srv, serverStream, r, methodInfo, mk, mv)
176 }
177}