001/* 002 * Copyright 2009-2018 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2009-2018 Ping Identity Corporation 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.util; 022 023 024 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.logging.Level; 030 031import static com.unboundid.util.Debug.*; 032 033 034 035/** 036 * Instances of this class are used to ensure that certain actions are performed 037 * at a fixed rate per interval (e.g. 10000 search operations per second). 038 * <p> 039 * Once a class is constructed with the duration of an interval and the target 040 * per interval, the {@link #await} method only releases callers at the 041 * specified number of times per interval. This class is most useful when 042 * the target number per interval exceeds the limits of other approaches 043 * such as {@code java.util.Timer} or 044 * {@code java.util.concurrent.ScheduledThreadPoolExecutor}. For instance, 045 * this does a good job of ensuring that something happens about 10000 times 046 * per second, but it's overkill to ensure something happens five times per 047 * hour. This does come at a cost. In the worst case, a single thread is 048 * tied up in a loop doing a small amount of computation followed by a 049 * Thread.yield(). Calling Thread.sleep() is not possible because many 050 * platforms sleep for a minimum of 10ms, and all platforms require sleeping 051 * for at least 1ms. 052 * <p> 053 * Testing has shown that this class is accurate for a "no-op" 054 * action up to two million per second, which vastly exceeds its 055 * typical use in tools such as {@code searchrate} and {@code modrate}. This 056 * class is designed to be called by multiple threads, however, it does not 057 * make any fairness guarantee between threads; a single-thread might be 058 * released from the {@link #await} method many times before another thread 059 * that is blocked in that method. 060 * <p> 061 * This class attempts to smooth out the target per interval throughout each 062 * interval. At a given ratio, R between 0 and 1, through the interval, the 063 * expected number of actions to have been performed in the interval at that 064 * time is R times the target per interval. That is, 10% of the way through 065 * the interval, approximately 10% of the actions have been performed, and 066 * 80% of the way through the interval, 80% of the actions have been performed. 067 * <p> 068 * It's possible to wait for multiple "actions" in one call with 069 * {@link #await(int)}. An example use is rate limiting writing bytes out to 070 * a file. You could configure a FixedRateBarrier to only allow 1M bytes to 071 * be written per second, and then call {@link #await(int)} with the size of 072 * the byte buffer to write. The call to {@link #await(int)} would block until 073 * writing out the buffer would not exceed the desired rate. 074 */ 075@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE) 076public final class FixedRateBarrier 077 implements Serializable 078{ 079 /** 080 * The minimum number of milliseconds that Thread.sleep() can handle 081 * accurately. This varies from platform to platform, so we measure it 082 * once in the static initializer below. When using a low rate (such as 083 * 100 per second), we can often sleep between iterations instead of having 084 * to spin calling Thread.yield(). 085 */ 086 private static final long minSleepMillis; 087 static 088 { 089 // Calibrate the minimum number of milliseconds that we can reliably 090 // sleep on this system. We take several measurements and take the median, 091 // which keeps us from choosing an outlier. 092 // 093 // It varies from system to system. Testing on three systems, yielded 094 // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms), 095 // Windows 7 (1 ms). 096 097 final List<Long> minSleepMillisMeasurements = new ArrayList<>(11); 098 099 for (int i = 0; i < 11; i++) 100 { 101 final long timeBefore = System.currentTimeMillis(); 102 try 103 { 104 Thread.sleep(1); 105 } 106 catch (final InterruptedException e) 107 { 108 debugException(e); 109 } 110 final long sleepMillis = System.currentTimeMillis() - timeBefore; 111 minSleepMillisMeasurements.add(sleepMillis); 112 } 113 114 Collections.sort(minSleepMillisMeasurements); 115 final long medianSleepMillis = minSleepMillisMeasurements.get( 116 minSleepMillisMeasurements.size()/2); 117 118 minSleepMillis = Math.max(medianSleepMillis, 1); 119 120 final String message = "Calibrated FixedRateBarrier to use " + 121 "minSleepMillis=" + minSleepMillis + ". " + 122 "Minimum sleep measurements = " + minSleepMillisMeasurements; 123 debug(Level.INFO, DebugType.OTHER, message); 124 } 125 126 127 128 /** 129 * The serial version UID for this serializable class. 130 */ 131 private static final long serialVersionUID = -9048370191248737239L; 132 133 134 135 // This tracks when this class is shut down. Calls to await() after 136 // shutdownRequested() is called, will return immediately with a value of 137 // true. 138 private volatile boolean shutdownRequested = false; 139 140 141 // 142 // The following class variables are guarded by synchronized(this). 143 // 144 145 // The duration of the target interval in nano-seconds. 146 private long intervalDurationNanos; 147 148 // This tracks the number of milliseconds between each iteration if they 149 // were evenly spaced. 150 // 151 // If intervalDurationMs=1000 and perInterval=100, then this is 100. 152 // If intervalDurationMs=1000 and perInterval=10000, then this is .1. 153 private double millisBetweenIterations; 154 155 // The target number of times to release a thread per interval. 156 private int perInterval; 157 158 // A count of the number of times that await has returned within the current 159 // interval. 160 private long countInThisInterval; 161 162 // The start of this interval in terms of System.nanoTime(). 163 private long intervalStartNanos; 164 165 // The end of this interval in terms of System.nanoTime(). 166 private long intervalEndNanos; 167 168 169 170 /** 171 * Constructs a new FixedRateBarrier, which is active until 172 * {@link #shutdownRequested} is called. 173 * 174 * @param intervalDurationMs The duration of the interval in milliseconds. 175 * @param perInterval The target number of times that {@link #await} should 176 * return per interval. 177 */ 178 public FixedRateBarrier(final long intervalDurationMs, final int perInterval) 179 { 180 setRate(intervalDurationMs, perInterval); 181 } 182 183 184 185 /** 186 * Updates the rates associated with this FixedRateBarrier. The new rate 187 * will be in effect when this method returns. 188 * 189 * @param intervalDurationMs The duration of the interval in milliseconds. 190 * @param perInterval The target number of times that {@link #await} should 191 * return per interval. 192 */ 193 public synchronized void setRate(final long intervalDurationMs, 194 final int perInterval) 195 { 196 Validator.ensureTrue(intervalDurationMs > 0, 197 "FixedRateBarrier.intervalDurationMs must be at least 1."); 198 Validator.ensureTrue(perInterval > 0, 199 "FixedRateBarrier.perInterval must be at least 1."); 200 201 this.perInterval = perInterval; 202 203 intervalDurationNanos = 1000L * 1000L * intervalDurationMs; 204 205 millisBetweenIterations = (double)intervalDurationMs/(double)perInterval; 206 207 // Reset the intervals and all of the counters. 208 countInThisInterval = 0; 209 intervalStartNanos = 0; 210 intervalEndNanos = 0; 211 } 212 213 214 215 /** 216 * This method waits until it is time for the next 'action' to be performed 217 * based on the specified interval duration and target per interval. This 218 * method can be called by multiple threads simultaneously. This method 219 * returns immediately if shutdown has been requested. 220 * 221 * @return {@code true} if shutdown has been requested and {@code} false 222 * otherwise. 223 */ 224 public synchronized boolean await() 225 { 226 return await(1); 227 } 228 229 230 231 /** 232 * This method waits until it is time for the next {@code count} 'actions' 233 * to be performed based on the specified interval duration and target per 234 * interval. To achieve the target rate, it's recommended that on average 235 * {@code count} is small relative to {@code perInterval} (and the 236 * {@code count} must not be larger than {@code perInterval}). A 237 * {@code count} value will not be split across intervals, and due to timing 238 * issues, it's possible that a {@code count} that barely fits in the 239 * current interval will need to wait until the next interval. If it's not 240 * possible to use smaller 'count' values, then increase {@code perInterval} 241 * and {@code intervalDurationMs} by the same relative amount. As an 242 * example, if {@code count} is on average 1/10 as big as 243 * {@code perInterval}, then you can expect to attain 90% of the target 244 * rate. Increasing {@code perInterval} and {@code intervalDurationMs} by 245 * 10x means that 99% of the target rate can be achieved. 246 * <p> 247 * This method can be called by multiple threads simultaneously. This method 248 * returns immediately if shutdown has been requested. 249 * 250 * @param count The number of 'actions' being performed. It must be less 251 * than or equal to {@code perInterval}, and is recommended to 252 * be fairly small relative to {@code perInterval} so that it 253 * is easier to achieve the desired rate and exhibit smoother 254 * performance. 255 * 256 * @return {@code true} if shutdown has been requested and {@code} false 257 * otherwise. 258 */ 259 public synchronized boolean await(final int count) 260 { 261 if (count > perInterval) 262 { 263 Validator.ensureTrue(false, 264 "FixedRateBarrier.await(int) count value " + count + 265 " exceeds perInterval value " + perInterval + 266 ". The provided count value must be less than or equal to " + 267 "the perInterval value."); 268 } 269 else if (count <= 0) 270 { 271 return shutdownRequested; 272 } 273 274 // Loop forever until we are requested to shutdown or it is time to perform 275 // the next 'action' in which case we break from the loop. 276 while (!shutdownRequested) 277 { 278 final long now = System.nanoTime(); 279 280 if ((intervalStartNanos == 0) || // Handles the first time we're called. 281 (now < intervalStartNanos)) // Handles a change in the clock. 282 { 283 intervalStartNanos = now; 284 intervalEndNanos = intervalStartNanos + intervalDurationNanos; 285 } 286 else if (now >= intervalEndNanos) // End of an interval. 287 { 288 countInThisInterval = 0; 289 290 if (now < (intervalEndNanos + intervalDurationNanos)) 291 { 292 // If we have already passed the end of the next interval, then we 293 // don't try to catch up. Instead we just reset the start of the 294 // next interval to now. This could happen if the system clock 295 // was set to the future, we're running in a debugger, or we have 296 // very short intervals and are unable to keep up. 297 intervalStartNanos = now; 298 } 299 else 300 { 301 // Usually we're some small fraction into the next interval, so 302 // we set the start of the current interval to the end of the 303 // previous one. 304 intervalStartNanos = intervalEndNanos; 305 } 306 intervalEndNanos = intervalStartNanos + intervalDurationNanos; 307 } 308 309 final long intervalRemaining = intervalEndNanos - now; 310 if (intervalRemaining <= 0) 311 { 312 // This shouldn't happen, but we're careful not to divide by 0. 313 continue; 314 } 315 316 final double intervalFractionRemaining = 317 (double) intervalRemaining / intervalDurationNanos; 318 319 final double expectedRemaining = intervalFractionRemaining * perInterval; 320 final long actualRemaining = perInterval - countInThisInterval; 321 322 final long countBehind = 323 (long)Math.ceil(actualRemaining - expectedRemaining); 324 325 if (count <= countBehind) 326 { 327 // We are on schedule or behind schedule so let the 'action(s)' 328 // happen. 329 countInThisInterval += count; 330 break; 331 } 332 else 333 { 334 // If we can sleep until it's time to leave this barrier, then do 335 // so to keep from spinning on a CPU doing Thread.yield(). 336 337 final long countNeeded = count - countBehind; 338 final long remainingMillis = 339 (long) Math.floor(millisBetweenIterations * countNeeded); 340 341 if (remainingMillis >= minSleepMillis) 342 { 343 // Cap how long we sleep so that we can respond to a change in the 344 // rate without too much delay. 345 try 346 { 347 // We need to wait here instead of Thread.sleep so that we don't 348 // block setRate. Also, cap how long we sleep so that we can 349 // respond to a change in the rate without too much delay. 350 final long waitTime = Math.min(remainingMillis, 10); 351 wait(waitTime); 352 } 353 catch (final InterruptedException e) 354 { 355 debugException(e); 356 Thread.currentThread().interrupt(); 357 return shutdownRequested; 358 } 359 } 360 else 361 { 362 // We're ahead of schedule so yield to other threads, and then try 363 // again. Note: this is the most costly part of the algorithm because 364 // we have to busy wait due to the lack of sleeping for very small 365 // amounts of time. 366 Thread.yield(); 367 } 368 } 369 } 370 371 return shutdownRequested; 372 } 373 374 375 376 /** 377 * Retrieves information about the current target rate for this barrier. The 378 * value returned will include a {@code Long} that specifies the duration of 379 * the current interval in milliseconds and an {@code Integer} that specifies 380 * the number of times that the {@link #await} method should return per 381 * interval. 382 * 383 * @return Information about hte current target rate for this barrier. 384 */ 385 public synchronized ObjectPair<Long,Integer> getTargetRate() 386 { 387 return new ObjectPair<>( 388 (intervalDurationNanos / (1000L * 1000L)), 389 perInterval); 390 } 391 392 393 394 /** 395 * Shuts down this barrier. Future calls to await() will return immediately. 396 */ 397 public void shutdownRequested() 398 { 399 shutdownRequested = true; 400 } 401 402 403 404 /** 405 * Returns {@code true} if shutdown has been requested. 406 * 407 * @return {@code true} if shutdown has been requested and {@code false} 408 * otherwise. 409 */ 410 public boolean isShutdownRequested() 411 { 412 return shutdownRequested; 413 } 414}