blob: a7ced60f5b95af3d9693630406ba76617774471d [file] [log] [blame]
Girish Kumar2ed051b2020-07-28 16:35:25 +00001// Copyright 2017 Michal Witkowski. All Rights Reserved.
2// See LICENSE for licensing terms.
3
4package grpc_ctxtags
5
6import (
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +00007 "context"
8
Girish Kumar2ed051b2020-07-28 16:35:25 +00009 "google.golang.org/grpc"
10 "google.golang.org/grpc/peer"
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +000011
12 "github.com/grpc-ecosystem/go-grpc-middleware"
Girish Kumar2ed051b2020-07-28 16:35:25 +000013)
14
15// UnaryServerInterceptor returns a new unary server interceptors that sets the values for request tags.
16func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
17 o := evaluateOptions(opts)
18 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
19 newCtx := newTagsForCtx(ctx)
20 if o.requestFieldsFunc != nil {
21 setRequestFieldTags(newCtx, o.requestFieldsFunc, info.FullMethod, req)
22 }
23 return handler(newCtx, req)
24 }
25}
26
27// StreamServerInterceptor returns a new streaming server interceptor that sets the values for request tags.
28func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
29 o := evaluateOptions(opts)
30 return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
31 newCtx := newTagsForCtx(stream.Context())
32 if o.requestFieldsFunc == nil {
33 // Short-circuit, don't do the expensive bit of allocating a wrappedStream.
34 wrappedStream := grpc_middleware.WrapServerStream(stream)
35 wrappedStream.WrappedContext = newCtx
36 return handler(srv, wrappedStream)
37 }
38 wrapped := &wrappedStream{stream, info, o, newCtx, true}
39 err := handler(srv, wrapped)
40 return err
41 }
42}
43
44// wrappedStream is a thin wrapper around grpc.ServerStream that allows modifying context and extracts log fields from the initial message.
45type wrappedStream struct {
46 grpc.ServerStream
47 info *grpc.StreamServerInfo
48 opts *options
49 // WrappedContext is the wrapper's own Context. You can assign it.
50 WrappedContext context.Context
51 initial bool
52}
53
54// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
55func (w *wrappedStream) Context() context.Context {
56 return w.WrappedContext
57}
58
59func (w *wrappedStream) RecvMsg(m interface{}) error {
60 err := w.ServerStream.RecvMsg(m)
61 // We only do log fields extraction on the single-request of a server-side stream.
62 if !w.info.IsClientStream || w.opts.requestFieldsFromInitial && w.initial {
63 w.initial = false
64
65 setRequestFieldTags(w.Context(), w.opts.requestFieldsFunc, w.info.FullMethod, m)
66 }
67 return err
68}
69
70func newTagsForCtx(ctx context.Context) context.Context {
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +000071 t := NewTags()
Girish Kumar2ed051b2020-07-28 16:35:25 +000072 if peer, ok := peer.FromContext(ctx); ok {
73 t.Set("peer.address", peer.Addr.String())
74 }
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +000075 return SetInContext(ctx, t)
Girish Kumar2ed051b2020-07-28 16:35:25 +000076}
77
78func setRequestFieldTags(ctx context.Context, f RequestFieldExtractorFunc, fullMethodName string, req interface{}) {
79 if valMap := f(fullMethodName, req); valMap != nil {
80 t := Extract(ctx)
81 for k, v := range valMap {
82 t.Set("grpc.request."+k, v)
83 }
84 }
85}