blob: 038afd26bdc59b4136e75031e7af761297e5a533 [file] [log] [blame]
Girish Kumar2ed051b2020-07-28 16:35:25 +00001// Copyright 2017 Michal Witkowski. All Rights Reserved.
2// See LICENSE for licensing terms.
3
4package grpc_ctxtags
5
6import (
7 "github.com/grpc-ecosystem/go-grpc-middleware"
8 "golang.org/x/net/context"
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/peer"
11)
12
13// UnaryServerInterceptor returns a new unary server interceptors that sets the values for request tags.
14func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
15 o := evaluateOptions(opts)
16 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
17 newCtx := newTagsForCtx(ctx)
18 if o.requestFieldsFunc != nil {
19 setRequestFieldTags(newCtx, o.requestFieldsFunc, info.FullMethod, req)
20 }
21 return handler(newCtx, req)
22 }
23}
24
25// StreamServerInterceptor returns a new streaming server interceptor that sets the values for request tags.
26func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
27 o := evaluateOptions(opts)
28 return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
29 newCtx := newTagsForCtx(stream.Context())
30 if o.requestFieldsFunc == nil {
31 // Short-circuit, don't do the expensive bit of allocating a wrappedStream.
32 wrappedStream := grpc_middleware.WrapServerStream(stream)
33 wrappedStream.WrappedContext = newCtx
34 return handler(srv, wrappedStream)
35 }
36 wrapped := &wrappedStream{stream, info, o, newCtx, true}
37 err := handler(srv, wrapped)
38 return err
39 }
40}
41
42// wrappedStream is a thin wrapper around grpc.ServerStream that allows modifying context and extracts log fields from the initial message.
43type wrappedStream struct {
44 grpc.ServerStream
45 info *grpc.StreamServerInfo
46 opts *options
47 // WrappedContext is the wrapper's own Context. You can assign it.
48 WrappedContext context.Context
49 initial bool
50}
51
52// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
53func (w *wrappedStream) Context() context.Context {
54 return w.WrappedContext
55}
56
57func (w *wrappedStream) RecvMsg(m interface{}) error {
58 err := w.ServerStream.RecvMsg(m)
59 // We only do log fields extraction on the single-request of a server-side stream.
60 if !w.info.IsClientStream || w.opts.requestFieldsFromInitial && w.initial {
61 w.initial = false
62
63 setRequestFieldTags(w.Context(), w.opts.requestFieldsFunc, w.info.FullMethod, m)
64 }
65 return err
66}
67
68func newTagsForCtx(ctx context.Context) context.Context {
69 t := newTags()
70 if peer, ok := peer.FromContext(ctx); ok {
71 t.Set("peer.address", peer.Addr.String())
72 }
73 return setInContext(ctx, t)
74}
75
76func setRequestFieldTags(ctx context.Context, f RequestFieldExtractorFunc, fullMethodName string, req interface{}) {
77 if valMap := f(fullMethodName, req); valMap != nil {
78 t := Extract(ctx)
79 for k, v := range valMap {
80 t.Set("grpc.request."+k, v)
81 }
82 }
83}