khenaidoo | 106c61a | 2021-08-11 18:05:46 -0400 | [diff] [blame] | 1 | package sarama |
| 2 | |
| 3 | // ProducerInterceptor allows you to intercept (and possibly mutate) the records |
| 4 | // received by the producer before they are published to the Kafka cluster. |
| 5 | // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation |
| 6 | type ProducerInterceptor interface { |
| 7 | |
| 8 | // OnSend is called when the producer message is intercepted. Please avoid |
| 9 | // modifying the message until it's safe to do so, as this is _not_ a copy |
| 10 | // of the message. |
| 11 | OnSend(*ProducerMessage) |
| 12 | } |
| 13 | |
| 14 | // ConsumerInterceptor allows you to intercept (and possibly mutate) the records |
| 15 | // received by the consumer before they are sent to the messages channel. |
| 16 | // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation |
| 17 | type ConsumerInterceptor interface { |
| 18 | |
| 19 | // OnConsume is called when the consumed message is intercepted. Please |
| 20 | // avoid modifying the message until it's safe to do so, as this is _not_ a |
| 21 | // copy of the message. |
| 22 | OnConsume(*ConsumerMessage) |
| 23 | } |
| 24 | |
| 25 | func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerInterceptor) { |
| 26 | defer func() { |
| 27 | if r := recover(); r != nil { |
| 28 | Logger.Printf("Error when calling producer interceptor: %s, %w\n", interceptor, r) |
| 29 | } |
| 30 | }() |
| 31 | |
| 32 | interceptor.OnSend(msg) |
| 33 | } |
| 34 | |
| 35 | func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerInterceptor) { |
| 36 | defer func() { |
| 37 | if r := recover(); r != nil { |
| 38 | Logger.Printf("Error when calling consumer interceptor: %s, %w\n", interceptor, r) |
| 39 | } |
| 40 | }() |
| 41 | |
| 42 | interceptor.OnConsume(msg) |
| 43 | } |