blob: dc3dc72f6b09d3a18fce1aa873dfc2cb2ec85116 [file] [log] [blame]
Akash Kankanala761955c2024-02-21 19:32:20 +05301/*
2 *
3 * Copyright 2023 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "math"
24 "sync"
25 "sync/atomic"
26 "time"
27)
28
29// For overriding in unit tests.
30var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
31 return time.AfterFunc(d, f)
32}
33
34// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter
35// and exit from idle mode.
36type idlenessEnforcer interface {
37 exitIdleMode() error
38 enterIdleMode() error
39}
40
41// idlenessManager defines the functionality required to track RPC activity on a
42// channel.
43type idlenessManager interface {
44 onCallBegin() error
45 onCallEnd()
46 close()
47}
48
49type noopIdlenessManager struct{}
50
51func (noopIdlenessManager) onCallBegin() error { return nil }
52func (noopIdlenessManager) onCallEnd() {}
53func (noopIdlenessManager) close() {}
54
55// idlenessManagerImpl implements the idlenessManager interface. It uses atomic
56// operations to synchronize access to shared state and a mutex to guarantee
57// mutual exclusion in a critical section.
58type idlenessManagerImpl struct {
59 // State accessed atomically.
60 lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
61 activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
62 activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
63 closed int32 // Boolean; True when the manager is closed.
64
65 // Can be accessed without atomics or mutex since these are set at creation
66 // time and read-only after that.
67 enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
68 timeout int64 // Idle timeout duration nanos stored as an int64.
69
70 // idleMu is used to guarantee mutual exclusion in two scenarios:
71 // - Opposing intentions:
72 // - a: Idle timeout has fired and handleIdleTimeout() is trying to put
73 // the channel in idle mode because the channel has been inactive.
74 // - b: At the same time an RPC is made on the channel, and onCallBegin()
75 // is trying to prevent the channel from going idle.
76 // - Competing intentions:
77 // - The channel is in idle mode and there are multiple RPCs starting at
78 // the same time, all trying to move the channel out of idle. Only one
79 // of them should succeed in doing so, while the other RPCs should
80 // piggyback on the first one and be successfully handled.
81 idleMu sync.RWMutex
82 actuallyIdle bool
83 timer *time.Timer
84}
85
86// newIdlenessManager creates a new idleness manager implementation for the
87// given idle timeout.
88func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager {
89 if idleTimeout == 0 {
90 return noopIdlenessManager{}
91 }
92
93 i := &idlenessManagerImpl{
94 enforcer: enforcer,
95 timeout: int64(idleTimeout),
96 }
97 i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout)
98 return i
99}
100
101// resetIdleTimer resets the idle timer to the given duration. This method
102// should only be called from the timer callback.
103func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) {
104 i.idleMu.Lock()
105 defer i.idleMu.Unlock()
106
107 if i.timer == nil {
108 // Only close sets timer to nil. We are done.
109 return
110 }
111
112 // It is safe to ignore the return value from Reset() because this method is
113 // only ever called from the timer callback, which means the timer has
114 // already fired.
115 i.timer.Reset(d)
116}
117
118// handleIdleTimeout is the timer callback that is invoked upon expiry of the
119// configured idle timeout. The channel is considered inactive if there are no
120// ongoing calls and no RPC activity since the last time the timer fired.
121func (i *idlenessManagerImpl) handleIdleTimeout() {
122 if i.isClosed() {
123 return
124 }
125
126 if atomic.LoadInt32(&i.activeCallsCount) > 0 {
127 i.resetIdleTimer(time.Duration(i.timeout))
128 return
129 }
130
131 // There has been activity on the channel since we last got here. Reset the
132 // timer and return.
133 if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
134 // Set the timer to fire after a duration of idle timeout, calculated
135 // from the time the most recent RPC completed.
136 atomic.StoreInt32(&i.activeSinceLastTimerCheck, 0)
137 i.resetIdleTimer(time.Duration(atomic.LoadInt64(&i.lastCallEndTime) + i.timeout - time.Now().UnixNano()))
138 return
139 }
140
141 // This CAS operation is extremely likely to succeed given that there has
142 // been no activity since the last time we were here. Setting the
143 // activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the
144 // channel is either in idle mode or is trying to get there.
145 if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) {
146 // This CAS operation can fail if an RPC started after we checked for
147 // activity at the top of this method, or one was ongoing from before
148 // the last time we were here. In both case, reset the timer and return.
149 i.resetIdleTimer(time.Duration(i.timeout))
150 return
151 }
152
153 // Now that we've set the active calls count to -math.MaxInt32, it's time to
154 // actually move to idle mode.
155 if i.tryEnterIdleMode() {
156 // Successfully entered idle mode. No timer needed until we exit idle.
157 return
158 }
159
160 // Failed to enter idle mode due to a concurrent RPC that kept the channel
161 // active, or because of an error from the channel. Undo the attempt to
162 // enter idle, and reset the timer to try again later.
163 atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
164 i.resetIdleTimer(time.Duration(i.timeout))
165}
166
167// tryEnterIdleMode instructs the channel to enter idle mode. But before
168// that, it performs a last minute check to ensure that no new RPC has come in,
169// making the channel active.
170//
171// Return value indicates whether or not the channel moved to idle mode.
172//
173// Holds idleMu which ensures mutual exclusion with exitIdleMode.
174func (i *idlenessManagerImpl) tryEnterIdleMode() bool {
175 i.idleMu.Lock()
176 defer i.idleMu.Unlock()
177
178 if atomic.LoadInt32(&i.activeCallsCount) != -math.MaxInt32 {
179 // We raced and lost to a new RPC. Very rare, but stop entering idle.
180 return false
181 }
182 if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
183 // An very short RPC could have come in (and also finished) after we
184 // checked for calls count and activity in handleIdleTimeout(), but
185 // before the CAS operation. So, we need to check for activity again.
186 return false
187 }
188
189 // No new RPCs have come in since we last set the active calls count value
190 // -math.MaxInt32 in the timer callback. And since we have the lock, it is
191 // safe to enter idle mode now.
192 if err := i.enforcer.enterIdleMode(); err != nil {
193 logger.Errorf("Failed to enter idle mode: %v", err)
194 return false
195 }
196
197 // Successfully entered idle mode.
198 i.actuallyIdle = true
199 return true
200}
201
202// onCallBegin is invoked at the start of every RPC.
203func (i *idlenessManagerImpl) onCallBegin() error {
204 if i.isClosed() {
205 return nil
206 }
207
208 if atomic.AddInt32(&i.activeCallsCount, 1) > 0 {
209 // Channel is not idle now. Set the activity bit and allow the call.
210 atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
211 return nil
212 }
213
214 // Channel is either in idle mode or is in the process of moving to idle
215 // mode. Attempt to exit idle mode to allow this RPC.
216 if err := i.exitIdleMode(); err != nil {
217 // Undo the increment to calls count, and return an error causing the
218 // RPC to fail.
219 atomic.AddInt32(&i.activeCallsCount, -1)
220 return err
221 }
222
223 atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
224 return nil
225}
226
227// exitIdleMode instructs the channel to exit idle mode.
228//
229// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
230func (i *idlenessManagerImpl) exitIdleMode() error {
231 i.idleMu.Lock()
232 defer i.idleMu.Unlock()
233
234 if !i.actuallyIdle {
235 // This can happen in two scenarios:
236 // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
237 // tryEnterIdleMode(). But before the latter could grab the lock, an RPC
238 // came in and onCallBegin() noticed that the calls count is negative.
239 // - Channel is in idle mode, and multiple new RPCs come in at the same
240 // time, all of them notice a negative calls count in onCallBegin and get
241 // here. The first one to get the lock would got the channel to exit idle.
242 //
243 // Either way, nothing to do here.
244 return nil
245 }
246
247 if err := i.enforcer.exitIdleMode(); err != nil {
248 return fmt.Errorf("channel failed to exit idle mode: %v", err)
249 }
250
251 // Undo the idle entry process. This also respects any new RPC attempts.
252 atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
253 i.actuallyIdle = false
254
255 // Start a new timer to fire after the configured idle timeout.
256 i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout)
257 return nil
258}
259
260// onCallEnd is invoked at the end of every RPC.
261func (i *idlenessManagerImpl) onCallEnd() {
262 if i.isClosed() {
263 return
264 }
265
266 // Record the time at which the most recent call finished.
267 atomic.StoreInt64(&i.lastCallEndTime, time.Now().UnixNano())
268
269 // Decrement the active calls count. This count can temporarily go negative
270 // when the timer callback is in the process of moving the channel to idle
271 // mode, but one or more RPCs come in and complete before the timer callback
272 // can get done with the process of moving to idle mode.
273 atomic.AddInt32(&i.activeCallsCount, -1)
274}
275
276func (i *idlenessManagerImpl) isClosed() bool {
277 return atomic.LoadInt32(&i.closed) == 1
278}
279
280func (i *idlenessManagerImpl) close() {
281 atomic.StoreInt32(&i.closed, 1)
282
283 i.idleMu.Lock()
284 i.timer.Stop()
285 i.timer = nil
286 i.idleMu.Unlock()
287}