blob: 0b4c03ff44736e06da3663129e501a994ac90b87 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouter
18
19import (
20 "errors"
21 "fmt"
22 "github.com/opencord/voltha-go/common/log"
23 "google.golang.org/grpc"
24 "google.golang.org/grpc/codes"
Scott Baker4989fe92019-10-09 17:03:06 -070025 "google.golang.org/grpc/status"
Scott Bakere7144bc2019-10-01 14:16:47 -070026 "net"
27 "net/url"
28 "strconv"
29)
30
31var (
32 clientStreamDescForProxying = &grpc.StreamDesc{
33 ServerStreams: true,
34 ClientStreams: true,
35 }
36)
37
38type server struct {
39 running bool
40 name string
41 proxyListener net.Listener
42 routers map[string]map[string]Router
43 proxyServer *grpc.Server
44}
45
46func newServer(config *ServerConfig) (*server, error) {
Scott Baker4989fe92019-10-09 17:03:06 -070047 var err error
Scott Bakere7144bc2019-10-01 14:16:47 -070048 var rtrn_err = false
49 var s *server
50 // Change over to the new configuration format
51 // Validate the configuration
52 // There should be a name
53 if config.Name == "" {
54 log.Error("A server has been defined with no name")
55 rtrn_err = true
56 }
57 // Validate that there's a port specified
58 if config.Port == 0 {
59 log.Errorf("Server %s does not have a valid port assigned", config.Name)
60 rtrn_err = true
61 }
62 // Validate the ip address if one is provided
63 if _, err := url.Parse(config.Addr); err != nil {
64 log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
65 rtrn_err = true
66 }
67 if config.Type != "grpc" && config.Type != "streaming_grpc" {
68 if config.Type == "" {
69 log.Errorf("A server 'type' must be defined for server %s", config.Name)
70 } else {
71 log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
72 "but '%s' was found for server '%s'", config.Type, config.Name)
73 }
74 rtrn_err = true
75 }
76 if len(config.Routers) == 0 {
77 log.Errorf("At least one router must be specified for server '%s'", config.Name)
78 rtrn_err = true
79 }
80
81 if rtrn_err {
82 return nil, errors.New("Server configuration failed")
83 } else {
84 // The configuration is valid, create a server and configure it.
85 s = &server{name: config.Name, routers: make(map[string]map[string]Router)}
86 // The listener
87 if s.proxyListener, err =
88 net.Listen("tcp", config.Addr+":"+
89 strconv.Itoa(int(config.Port))); err != nil {
90 log.Error(err)
91 return nil, err
92 }
93 // Create the routers
94 log.Debugf("Configuring the routers for server %s", s.name)
95 for p, r := range config.routers {
96 log.Debugf("Processing router %s for package %s", r.Name, p)
97 if dr, err := newRouter(r); err != nil {
98 log.Error(err)
99 return nil, err
100 } else {
101 log.Debugf("Adding router %s to the server %s for package %s and service %s",
102 dr.Name(), s.name, p, dr.Service())
103 if _, ok := s.routers[p]; ok {
104 s.routers[p][dr.Service()] = dr
105 } else {
106 s.routers[p] = make(map[string]Router)
107 s.routers[p][dr.Service()] = dr
108 }
109 }
110 }
Scott Baker4989fe92019-10-09 17:03:06 -0700111
Scott Bakere7144bc2019-10-01 14:16:47 -0700112 // Configure the grpc handler
113 s.proxyServer = grpc.NewServer(
114 grpc.CustomCodec(Codec()),
115 grpc.UnknownServiceHandler(s.TransparentHandler()),
116 )
117
118 }
119 return s, nil
120}
121
122func (s *server) Name() string {
123 return s.name
124}
125
126func (s *server) TransparentHandler() grpc.StreamHandler {
127 return s.handler
128}
129
130func (s *server) getRouter(pkg string, service string) (Router, bool) {
131 if fn, ok := s.routers[pkg][service]; ok { // Both specified
132 return fn, ok
133 } else if fn, ok = s.routers["*"][service]; ok { // Package wild card
134 return fn, ok
135 } else if fn, ok = s.routers[pkg]["*"]; ok { // Service wild card
136 return fn, ok
137 } else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
138 return fn, ok
139 } else {
140 return nil, false
141 }
142}
143
144func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
145 // Determine what router is intended to handle this request
146 fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
147 if !ok {
Scott Baker4989fe92019-10-09 17:03:06 -0700148 return status.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
Scott Bakere7144bc2019-10-01 14:16:47 -0700149 }
150 log.Debugf("\n\nProcessing grpc request %s on server %s", fullMethodName, s.name)
151 methodInfo := newMethodDetails(fullMethodName)
152 r, ok := s.getRouter(methodInfo.pkg, methodInfo.service)
153 //fn, ok := s.routers[methodInfo.pkg][methodInfo.service]
154 if !ok {
155 // TODO: Should this be punted to a default transparent router??
156 // Probably not, if one is defined yes otherwise just crap out.
157
158 err := fmt.Errorf("Unable to dispatch! Service '%s' for package '%s' not found.", methodInfo.service, methodInfo.pkg)
159 log.Error(err)
160 return err
161 }
162 log.Debugf("Selected router %s", r.Name())
163
164 mk, mv, err := r.GetMetaKeyVal(serverStream)
165 if err != nil {
166 log.Error(err)
167 return err
168 }
169
170 //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,methodInfo:methodInfo,metaKey:mk,metaVal:mv)
171
172 // Extract the cluster from the selected router and use it to manage the transfer
173 if cluster, err := r.BackendCluster(methodInfo.method, mk); err != nil {
174 return err
175 } else {
176 //return beCluster.handler(nbR)
177 return cluster.handler(srv, serverStream, r, methodInfo, mk, mv)
178 }
179}