001/*
002 * Copyright 2009-2018 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2009-2018 Ping Identity Corporation
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.logging.Level;
030
031import static com.unboundid.util.Debug.*;
032
033
034
035/**
036 * Instances of this class are used to ensure that certain actions are performed
037 * at a fixed rate per interval (e.g. 10000 search operations per second).
038 * <p>
039 * Once a class is constructed with the duration of an interval and the target
040 * per interval, the {@link #await} method only releases callers at the
041 * specified number of times per interval.  This class is most useful when
042 * the target number per interval exceeds the limits of other approaches
043 * such as {@code java.util.Timer} or
044 * {@code java.util.concurrent.ScheduledThreadPoolExecutor}.  For instance,
045 * this does a good job of ensuring that something happens about 10000 times
046 * per second, but it's overkill to ensure something happens five times per
047 * hour.  This does come at a cost.  In the worst case, a single thread is
048 * tied up in a loop doing a small amount of computation followed by a
049 * Thread.yield().  Calling Thread.sleep() is not possible because many
050 * platforms sleep for a minimum of 10ms, and all platforms require sleeping
051 * for at least 1ms.
052 * <p>
053 * Testing has shown that this class is accurate for a "no-op"
054 * action up to two million per second, which vastly exceeds its
055 * typical use in tools such as {@code searchrate} and {@code modrate}.  This
056 * class is designed to be called by multiple threads, however, it does not
057 * make any fairness guarantee between threads; a single-thread might be
058 * released from the {@link #await} method many times before another thread
059 * that is blocked in that method.
060 * <p>
061 * This class attempts to smooth out the target per interval throughout each
062 * interval.  At a given ratio, R between 0 and 1, through the interval, the
063 * expected number of actions to have been performed in the interval at that
064 * time is R times the target per interval.  That is, 10% of the way through
065 * the interval, approximately 10% of the actions have been performed, and
066 * 80% of the way through the interval, 80% of the actions have been performed.
067 * <p>
068 * It's possible to wait for multiple "actions" in one call with
069 * {@link #await(int)}. An example use is rate limiting writing bytes out to
070 * a file. You could configure a FixedRateBarrier to only allow 1M bytes to
071 * be written per second, and then call {@link #await(int)} with the size of
072 * the byte buffer to write. The call to {@link #await(int)} would block until
073 * writing out the buffer would not exceed the desired rate.
074 */
075@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
076public final class FixedRateBarrier
077       implements Serializable
078{
079  /**
080   * The minimum number of milliseconds that Thread.sleep() can handle
081   * accurately.  This varies from platform to platform, so we measure it
082   * once in the static initializer below.  When using a low rate (such as
083   * 100 per second), we can often sleep between iterations instead of having
084   * to spin calling Thread.yield().
085   */
086  private static final long minSleepMillis;
087  static
088  {
089    // Calibrate the minimum number of milliseconds that we can reliably
090    // sleep on this system.  We take several measurements and take the median,
091    // which keeps us from choosing an outlier.
092    //
093    // It varies from system to system.  Testing on three systems, yielded
094    // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms),
095    // Windows 7 (1 ms).
096
097    final List<Long> minSleepMillisMeasurements = new ArrayList<>(11);
098
099    for (int i = 0; i < 11; i++)
100    {
101      final long timeBefore = System.currentTimeMillis();
102      try
103      {
104        Thread.sleep(1);
105      }
106      catch (final InterruptedException e)
107      {
108        debugException(e);
109      }
110      final long sleepMillis = System.currentTimeMillis() - timeBefore;
111      minSleepMillisMeasurements.add(sleepMillis);
112    }
113
114    Collections.sort(minSleepMillisMeasurements);
115    final long medianSleepMillis = minSleepMillisMeasurements.get(
116            minSleepMillisMeasurements.size()/2);
117
118    minSleepMillis = Math.max(medianSleepMillis, 1);
119
120    final String message = "Calibrated FixedRateBarrier to use " +
121          "minSleepMillis=" + minSleepMillis + ".  " +
122          "Minimum sleep measurements = " + minSleepMillisMeasurements;
123    debug(Level.INFO, DebugType.OTHER, message);
124  }
125
126
127
128  /**
129   * The serial version UID for this serializable class.
130   */
131  private static final long serialVersionUID = -9048370191248737239L;
132
133
134
135  // This tracks when this class is shut down.  Calls to await() after
136  // shutdownRequested() is called, will return immediately with a value of
137  // true.
138  private volatile boolean shutdownRequested = false;
139
140
141  //
142  // The following class variables are guarded by synchronized(this).
143  //
144
145  // The duration of the target interval in nano-seconds.
146  private long intervalDurationNanos;
147
148  // This tracks the number of milliseconds between each iteration if they
149  // were evenly spaced.
150  //
151  // If intervalDurationMs=1000 and perInterval=100, then this is 100.
152  // If intervalDurationMs=1000 and perInterval=10000, then this is .1.
153  private double millisBetweenIterations;
154
155  // The target number of times to release a thread per interval.
156  private int perInterval;
157
158  // A count of the number of times that await has returned within the current
159  // interval.
160  private long countInThisInterval;
161
162  // The start of this interval in terms of System.nanoTime().
163  private long intervalStartNanos;
164
165  // The end of this interval in terms of System.nanoTime().
166  private long intervalEndNanos;
167
168
169
170  /**
171   * Constructs a new FixedRateBarrier, which is active until
172   * {@link #shutdownRequested} is called.
173   *
174   * @param  intervalDurationMs  The duration of the interval in milliseconds.
175   * @param  perInterval  The target number of times that {@link #await} should
176   *                      return per interval.
177   */
178  public FixedRateBarrier(final long intervalDurationMs, final int perInterval)
179  {
180    setRate(intervalDurationMs, perInterval);
181  }
182
183
184
185  /**
186   * Updates the rates associated with this FixedRateBarrier.  The new rate
187   * will be in effect when this method returns.
188   *
189   * @param  intervalDurationMs  The duration of the interval in milliseconds.
190   * @param  perInterval  The target number of times that {@link #await} should
191   *                      return per interval.
192   */
193  public synchronized void setRate(final long intervalDurationMs,
194                                   final int perInterval)
195  {
196    Validator.ensureTrue(intervalDurationMs > 0,
197         "FixedRateBarrier.intervalDurationMs must be at least 1.");
198    Validator.ensureTrue(perInterval > 0,
199         "FixedRateBarrier.perInterval must be at least 1.");
200
201    this.perInterval = perInterval;
202
203    intervalDurationNanos = 1000L * 1000L * intervalDurationMs;
204
205    millisBetweenIterations = (double)intervalDurationMs/(double)perInterval;
206
207    // Reset the intervals and all of the counters.
208    countInThisInterval = 0;
209    intervalStartNanos = 0;
210    intervalEndNanos = 0;
211  }
212
213
214
215  /**
216   * This method waits until it is time for the next 'action' to be performed
217   * based on the specified interval duration and target per interval.  This
218   * method can be called by multiple threads simultaneously.  This method
219   * returns immediately if shutdown has been requested.
220   *
221   * @return  {@code true} if shutdown has been requested and {@code} false
222   *          otherwise.
223   */
224  public synchronized boolean await()
225  {
226    return await(1);
227  }
228
229
230
231  /**
232   * This method waits until it is time for the next {@code count} 'actions'
233   * to be performed based on the specified interval duration and target per
234   * interval.  To achieve the target rate, it's recommended that on average
235   * {@code count} is small relative to {@code perInterval} (and the
236   * {@code count} must not be larger than {@code perInterval}).  A
237   * {@code count} value will not be split across intervals, and due to timing
238   * issues, it's possible that a {@code count} that barely fits in the
239   * current interval will need to wait until the next interval.  If it's not
240   * possible to use smaller 'count' values, then increase {@code perInterval}
241   * and {@code intervalDurationMs} by the same relative amount.  As an
242   * example, if {@code count} is on average 1/10 as big as
243   * {@code perInterval}, then you can expect to attain 90% of the target
244   * rate.  Increasing {@code perInterval} and {@code intervalDurationMs} by
245   * 10x means that 99% of the target rate can be achieved.
246   * <p>
247   * This method can be called by multiple threads simultaneously.  This method
248   * returns immediately if shutdown has been requested.
249   *
250   * @param  count  The number of 'actions' being performed.  It must be less
251   *                than or equal to {@code perInterval}, and is recommended to
252   *                be fairly small relative to {@code perInterval} so that it
253   *                is easier to achieve the desired rate and exhibit smoother
254   *                performance.
255   *
256   * @return  {@code true} if shutdown has been requested and {@code} false
257   *          otherwise.
258   */
259  public synchronized boolean await(final int count)
260  {
261    if (count > perInterval)
262    {
263      Validator.ensureTrue(false,
264           "FixedRateBarrier.await(int) count value " + count +
265                " exceeds perInterval value " + perInterval +
266                ".  The provided count value must be less than or equal to " +
267                "the perInterval value.");
268    }
269    else if (count <= 0)
270    {
271      return shutdownRequested;
272    }
273
274    // Loop forever until we are requested to shutdown or it is time to perform
275    // the next 'action' in which case we break from the loop.
276    while (!shutdownRequested)
277    {
278      final long now = System.nanoTime();
279
280      if ((intervalStartNanos == 0) ||   // Handles the first time we're called.
281          (now < intervalStartNanos))    // Handles a change in the clock.
282      {
283        intervalStartNanos = now;
284        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
285      }
286      else if (now >= intervalEndNanos)  // End of an interval.
287      {
288        countInThisInterval = 0;
289
290        if (now < (intervalEndNanos + intervalDurationNanos))
291        {
292          // If we have already passed the end of the next interval, then we
293          // don't try to catch up.  Instead we just reset the start of the
294          // next interval to now.  This could happen if the system clock
295          // was set to the future, we're running in a debugger, or we have
296          // very short intervals and are unable to keep up.
297          intervalStartNanos = now;
298        }
299        else
300        {
301          // Usually we're some small fraction into the next interval, so
302          // we set the start of the current interval to the end of the
303          // previous one.
304          intervalStartNanos = intervalEndNanos;
305        }
306        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
307      }
308
309      final long intervalRemaining = intervalEndNanos - now;
310      if (intervalRemaining <= 0)
311      {
312        // This shouldn't happen, but we're careful not to divide by 0.
313        continue;
314      }
315
316      final double intervalFractionRemaining =
317           (double) intervalRemaining / intervalDurationNanos;
318
319      final double expectedRemaining = intervalFractionRemaining * perInterval;
320      final long actualRemaining = perInterval - countInThisInterval;
321
322      final long countBehind =
323              (long)Math.ceil(actualRemaining - expectedRemaining);
324
325      if (count <= countBehind)
326      {
327        // We are on schedule or behind schedule so let the 'action(s)'
328        // happen.
329        countInThisInterval += count;
330        break;
331      }
332      else
333      {
334        // If we can sleep until it's time to leave this barrier, then do
335        // so to keep from spinning on a CPU doing Thread.yield().
336
337        final long countNeeded = count - countBehind;
338        final long remainingMillis =
339             (long) Math.floor(millisBetweenIterations * countNeeded);
340
341        if (remainingMillis >= minSleepMillis)
342        {
343          // Cap how long we sleep so that we can respond to a change in the
344          // rate without too much delay.
345          try
346          {
347            // We need to wait here instead of Thread.sleep so that we don't
348            // block setRate.  Also, cap how long we sleep so that we can
349            // respond to a change in the rate without too much delay.
350            final long waitTime = Math.min(remainingMillis, 10);
351            wait(waitTime);
352          }
353          catch (final InterruptedException e)
354          {
355            debugException(e);
356            Thread.currentThread().interrupt();
357            return shutdownRequested;
358          }
359        }
360        else
361        {
362          // We're ahead of schedule so yield to other threads, and then try
363          // again.  Note: this is the most costly part of the algorithm because
364          // we have to busy wait due to the lack of sleeping for very small
365          // amounts of time.
366          Thread.yield();
367        }
368      }
369    }
370
371    return shutdownRequested;
372  }
373
374
375
376  /**
377   * Retrieves information about the current target rate for this barrier.  The
378   * value returned will include a {@code Long} that specifies the duration of
379   * the current interval in milliseconds and an {@code Integer} that specifies
380   * the number of times that the {@link #await} method should return per
381   * interval.
382   *
383   * @return  Information about hte current target rate for this barrier.
384   */
385  public synchronized ObjectPair<Long,Integer> getTargetRate()
386  {
387    return new ObjectPair<>(
388         (intervalDurationNanos / (1000L * 1000L)),
389         perInterval);
390  }
391
392
393
394  /**
395   * Shuts down this barrier.  Future calls to await() will return immediately.
396   */
397  public void shutdownRequested()
398  {
399    shutdownRequested = true;
400  }
401
402
403
404  /**
405   * Returns {@code true} if shutdown has been requested.
406   *
407   * @return  {@code true} if shutdown has been requested and {@code false}
408   *          otherwise.
409   */
410  public boolean isShutdownRequested()
411  {
412    return shutdownRequested;
413  }
414}