| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| // Package resolver provides internal resolver-related functionality. |
| package resolver |
| |
| import ( |
| "context" |
| "sync" |
| |
| "google.golang.org/grpc/internal/serviceconfig" |
| "google.golang.org/grpc/metadata" |
| "google.golang.org/grpc/resolver" |
| ) |
| |
| // ConfigSelector controls what configuration to use for every RPC. |
| type ConfigSelector interface { |
| // Selects the configuration for the RPC, or terminates it using the error. |
| // This error will be converted by the gRPC library to a status error with |
| // code UNKNOWN if it is not returned as a status error. |
| SelectConfig(RPCInfo) (*RPCConfig, error) |
| } |
| |
| // RPCInfo contains RPC information needed by a ConfigSelector. |
| type RPCInfo struct { |
| // Context is the user's context for the RPC and contains headers and |
| // application timeout. It is passed for interception purposes and for |
| // efficiency reasons. SelectConfig should not be blocking. |
| Context context.Context |
| Method string // i.e. "/Service/Method" |
| } |
| |
| // RPCConfig describes the configuration to use for each RPC. |
| type RPCConfig struct { |
| // The context to use for the remainder of the RPC; can pass info to LB |
| // policy or affect timeout or metadata. |
| Context context.Context |
| MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC |
| OnCommitted func() // Called when the RPC has been committed (retries no longer possible) |
| Interceptor ClientInterceptor |
| } |
| |
| // ClientStream is the same as grpc.ClientStream, but defined here for circular |
| // dependency reasons. |
| type ClientStream interface { |
| // Header returns the header metadata received from the server if there |
| // is any. It blocks if the metadata is not ready to read. |
| Header() (metadata.MD, error) |
| // Trailer returns the trailer metadata from the server, if there is any. |
| // It must only be called after stream.CloseAndRecv has returned, or |
| // stream.Recv has returned a non-nil error (including io.EOF). |
| Trailer() metadata.MD |
| // CloseSend closes the send direction of the stream. It closes the stream |
| // when non-nil error is met. It is also not safe to call CloseSend |
| // concurrently with SendMsg. |
| CloseSend() error |
| // Context returns the context for this stream. |
| // |
| // It should not be called until after Header or RecvMsg has returned. Once |
| // called, subsequent client-side retries are disabled. |
| Context() context.Context |
| // SendMsg is generally called by generated code. On error, SendMsg aborts |
| // the stream. If the error was generated by the client, the status is |
| // returned directly; otherwise, io.EOF is returned and the status of |
| // the stream may be discovered using RecvMsg. |
| // |
| // SendMsg blocks until: |
| // - There is sufficient flow control to schedule m with the transport, or |
| // - The stream is done, or |
| // - The stream breaks. |
| // |
| // SendMsg does not wait until the message is received by the server. An |
| // untimely stream closure may result in lost messages. To ensure delivery, |
| // users should ensure the RPC completed successfully using RecvMsg. |
| // |
| // It is safe to have a goroutine calling SendMsg and another goroutine |
| // calling RecvMsg on the same stream at the same time, but it is not safe |
| // to call SendMsg on the same stream in different goroutines. It is also |
| // not safe to call CloseSend concurrently with SendMsg. |
| SendMsg(m interface{}) error |
| // RecvMsg blocks until it receives a message into m or the stream is |
| // done. It returns io.EOF when the stream completes successfully. On |
| // any other error, the stream is aborted and the error contains the RPC |
| // status. |
| // |
| // It is safe to have a goroutine calling SendMsg and another goroutine |
| // calling RecvMsg on the same stream at the same time, but it is not |
| // safe to call RecvMsg on the same stream in different goroutines. |
| RecvMsg(m interface{}) error |
| } |
| |
| // ClientInterceptor is an interceptor for gRPC client streams. |
| type ClientInterceptor interface { |
| // NewStream produces a ClientStream for an RPC which may optionally use |
| // the provided function to produce a stream for delegation. Note: |
| // RPCInfo.Context should not be used (will be nil). |
| // |
| // done is invoked when the RPC is finished using its connection, or could |
| // not be assigned a connection. RPC operations may still occur on |
| // ClientStream after done is called, since the interceptor is invoked by |
| // application-layer operations. done must never be nil when called. |
| NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error) |
| } |
| |
| // ServerInterceptor is an interceptor for incoming RPC's on gRPC server side. |
| type ServerInterceptor interface { |
| // AllowRPC checks if an incoming RPC is allowed to proceed based on |
| // information about connection RPC was received on, and HTTP Headers. This |
| // information will be piped into context. |
| AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting. |
| } |
| |
| type csKeyType string |
| |
| const csKey = csKeyType("grpc.internal.resolver.configSelector") |
| |
| // SetConfigSelector sets the config selector in state and returns the new |
| // state. |
| func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State { |
| state.Attributes = state.Attributes.WithValue(csKey, cs) |
| return state |
| } |
| |
| // GetConfigSelector retrieves the config selector from state, if present, and |
| // returns it or nil if absent. |
| func GetConfigSelector(state resolver.State) ConfigSelector { |
| cs, _ := state.Attributes.Value(csKey).(ConfigSelector) |
| return cs |
| } |
| |
| // SafeConfigSelector allows for safe switching of ConfigSelector |
| // implementations such that previous values are guaranteed to not be in use |
| // when UpdateConfigSelector returns. |
| type SafeConfigSelector struct { |
| mu sync.RWMutex |
| cs ConfigSelector |
| } |
| |
| // UpdateConfigSelector swaps to the provided ConfigSelector and blocks until |
| // all uses of the previous ConfigSelector have completed. |
| func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) { |
| scs.mu.Lock() |
| defer scs.mu.Unlock() |
| scs.cs = cs |
| } |
| |
| // SelectConfig defers to the current ConfigSelector in scs. |
| func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) { |
| scs.mu.RLock() |
| defer scs.mu.RUnlock() |
| return scs.cs.SelectConfig(r) |
| } |