001/*
002 * Copyright 2016-2020 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright 2016-2020 Ping Identity Corporation
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *    http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020/*
021 * Copyright (C) 2016-2020 Ping Identity Corporation
022 *
023 * This program is free software; you can redistribute it and/or modify
024 * it under the terms of the GNU General Public License (GPLv2 only)
025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
026 * as published by the Free Software Foundation.
027 *
028 * This program is distributed in the hope that it will be useful,
029 * but WITHOUT ANY WARRANTY; without even the implied warranty of
030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
031 * GNU General Public License for more details.
032 *
033 * You should have received a copy of the GNU General Public License
034 * along with this program; if not, see <http://www.gnu.org/licenses>.
035 */
036package com.unboundid.ldap.listener;
037
038
039
040import java.util.List;
041import java.util.concurrent.Semaphore;
042import java.util.concurrent.TimeUnit;
043
044import com.unboundid.ldap.protocol.AbandonRequestProtocolOp;
045import com.unboundid.ldap.protocol.AddRequestProtocolOp;
046import com.unboundid.ldap.protocol.AddResponseProtocolOp;
047import com.unboundid.ldap.protocol.BindRequestProtocolOp;
048import com.unboundid.ldap.protocol.BindResponseProtocolOp;
049import com.unboundid.ldap.protocol.CompareRequestProtocolOp;
050import com.unboundid.ldap.protocol.CompareResponseProtocolOp;
051import com.unboundid.ldap.protocol.DeleteRequestProtocolOp;
052import com.unboundid.ldap.protocol.DeleteResponseProtocolOp;
053import com.unboundid.ldap.protocol.ExtendedRequestProtocolOp;
054import com.unboundid.ldap.protocol.ExtendedResponseProtocolOp;
055import com.unboundid.ldap.protocol.LDAPMessage;
056import com.unboundid.ldap.protocol.ModifyRequestProtocolOp;
057import com.unboundid.ldap.protocol.ModifyResponseProtocolOp;
058import com.unboundid.ldap.protocol.ModifyDNRequestProtocolOp;
059import com.unboundid.ldap.protocol.ModifyDNResponseProtocolOp;
060import com.unboundid.ldap.protocol.SearchRequestProtocolOp;
061import com.unboundid.ldap.protocol.SearchResultDoneProtocolOp;
062import com.unboundid.ldap.sdk.Control;
063import com.unboundid.ldap.sdk.LDAPException;
064import com.unboundid.ldap.sdk.OperationType;
065import com.unboundid.ldap.sdk.ResultCode;
066import com.unboundid.util.Debug;
067import com.unboundid.util.NotMutable;
068import com.unboundid.util.StaticUtils;
069import com.unboundid.util.ThreadSafety;
070import com.unboundid.util.ThreadSafetyLevel;
071import com.unboundid.util.Validator;
072
073import static com.unboundid.ldap.listener.ListenerMessages.*;
074
075
076
077/**
078 * This class provides an implementation of an LDAP listener request handler
079 * that can be used to limit the number of requests that may be processed
080 * concurrently.  It uses one or more {@link Semaphore} instances to limit the
081 * number of requests that may be processed at any time, and provides the
082 * ability to impose limiting on a per-operation-type basis.
083 */
084@NotMutable()
085@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
086public final class ConcurrentRequestLimiterRequestHandler
087       extends LDAPListenerRequestHandler
088{
089  // The downstream request handler that will be used to process the requests
090  // after any appropriate concurrent request limiting has been performed.
091  private final LDAPListenerRequestHandler downstreamRequestHandler;
092
093  // A timeout value (expressed in milliseconds) that will cause the operation
094  // to be rejected rather than processed if the associated semaphore cannot be
095  // acquired in this length of time.
096  private final long rejectTimeoutMillis;
097
098  // The semaphores that will be used for each type of operation.
099  private final Semaphore abandonSemaphore;
100  private final Semaphore addSemaphore;
101  private final Semaphore bindSemaphore;
102  private final Semaphore compareSemaphore;
103  private final Semaphore deleteSemaphore;
104  private final Semaphore extendedSemaphore;
105  private final Semaphore modifySemaphore;
106  private final Semaphore modifyDNSemaphore;
107  private final Semaphore searchSemaphore;
108
109
110
111  /**
112   * Creates a new concurrent request limiter request handler that will impose
113   * the specified limit on the number of operations that may be in progress at
114   * any time.  The limit will be enforced for all types of operations except
115   * abandon and unbind operations, which will not be limited.
116   *
117   * @param  downstreamRequestHandler  The downstream request handler that will
118   *                                   be used to actually process the requests
119   *                                   after any appropriate limiting has been
120   *                                   performed.
121   * @param  maxConcurrentRequests     The maximum number of requests that may
122   *                                   be processed at any given time.  This
123   *                                   limit will be enforced for all operation
124   *                                   types except abandon and unbind, which
125   *                                   will not be limited.
126   * @param  rejectTimeoutMillis       A timeout value (expressed in
127   *                                   milliseconds) that will cause a requested
128   *                                   operation to be rejected rather than
129   *                                   processed if the associate semaphore
130   *                                   cannot be acquired in this length of
131   *                                   time.  A value of zero indicates that the
132   *                                   operation should be rejected immediately
133   *                                   if the maximum number of concurrent
134   *                                   requests are already in progress.  A
135   *                                   value that is less than zero indicates
136   *                                   that no timeout should be imposed and
137   *                                   that requests should be forced to wait as
138   *                                   long as necessary until they can be
139   *                                   processed.
140   */
141  public ConcurrentRequestLimiterRequestHandler(
142              final LDAPListenerRequestHandler downstreamRequestHandler,
143              final int maxConcurrentRequests, final long rejectTimeoutMillis)
144  {
145    this(downstreamRequestHandler, new Semaphore(maxConcurrentRequests),
146         rejectTimeoutMillis);
147  }
148
149
150
151  /**
152   * Creates a new concurrent request limiter request handler that will use the
153   * provided semaphore to limit on the number of operations that may be in
154   * progress at any time.  The limit will be enforced for all types of
155   * operations except abandon and unbind operations, which will not be limited.
156   *
157   * @param  downstreamRequestHandler  The downstream request handler that will
158   *                                   be used to actually process the requests
159   *                                   after any appropriate limiting has been
160   *                                   performed.
161   * @param  semaphore                 The semaphore that will be used to limit
162   *                                   the number of concurrent operations in
163   *                                   progress, for all operation types except
164   *                                   abandon and unbind.
165   * @param  rejectTimeoutMillis       A timeout value (expressed in
166   *                                   milliseconds) that will cause a requested
167   *                                   operation to be rejected rather than
168   *                                   processed if the associate semaphore
169   *                                   cannot be acquired in this length of
170   *                                   time.  A value of zero indicates that the
171   *                                   operation should be rejected immediately
172   *                                   if the maximum number of concurrent
173   *                                   requests are already in progress.  A
174   *                                   value that is less than zero indicates
175   *                                   that no timeout should be imposed and
176   *                                   that requests should be forced to wait as
177   *                                   long as necessary until they can be
178   *                                   processed.
179   */
180  public ConcurrentRequestLimiterRequestHandler(
181              final LDAPListenerRequestHandler downstreamRequestHandler,
182              final Semaphore semaphore, final long rejectTimeoutMillis)
183  {
184    this(downstreamRequestHandler, null, semaphore, semaphore, semaphore,
185         semaphore, semaphore, semaphore, semaphore, semaphore,
186         rejectTimeoutMillis);
187  }
188
189
190
191  /**
192   * Creates a new concurrent request limiter request handler that can use the
193   * provided semaphore instances to limit the number of operations in progress
194   * concurrently for each type of operation.  The same semaphore instance can
195   * be provided for multiple operation types if performance for those
196   * operations should be limited in aggregate rather than individually (e.g.,
197   * if you don't want the total combined number of search and modify operations
198   * in progress at any time to exceed a given threshold, then you could provide
199   * the same semaphore instance for the {@code modifySemaphore} and
200   * {@code searchSemaphore} arguments).
201   *
202   * @param  downstreamRequestHandler  The downstream request handler that will
203   *                                   be used to actually process the requests
204   *                                   after any appropriate rate limiting has
205   *                                   been performed.  It must not be
206   *                                   {@code null}.
207   * @param  abandonSemaphore          The semaphore to use when processing
208   *                                   abandon operations.  It may be
209   *                                   {@code null} if no concurrent request
210   *                                   limiting should be performed for abandon
211   *                                   operations.
212   * @param  addSemaphore              The semaphore to use when processing add
213   *                                   operations.  It may be {@code null} if no
214   *                                   concurrent request limiting should be
215   *                                   performed for add operations.
216   * @param  bindSemaphore             The semaphore to use when processing
217   *                                   bind operations.  It may be
218   *                                   {@code null} if no concurrent request
219   *                                   limiting should be performed for bind
220   *                                   operations.
221   * @param  compareSemaphore          The semaphore to use when processing
222   *                                   compare operations.  It may be
223   *                                   {@code null} if no concurrent request
224   *                                   limiting should be performed for compare
225   *                                   operations.
226   * @param  deleteSemaphore           The semaphore to use when processing
227   *                                   delete operations.  It may be
228   *                                   {@code null} if no concurrent request
229   *                                   limiting should be performed for delete
230   *                                   operations.
231   * @param  extendedSemaphore         The semaphore to use when processing
232   *                                   extended operations.  It may be
233   *                                   {@code null} if no concurrent request
234   *                                   limiting should be performed for extended
235   *                                   operations.
236   * @param  modifySemaphore           The semaphore to use when processing
237   *                                   modify operations.  It may be
238   *                                   {@code null} if no concurrent request
239   *                                   limiting should be performed for modify
240   *                                   operations.
241   * @param  modifyDNSemaphore         The semaphore to use when processing
242   *                                   modify DN operations.  It may be
243   *                                   {@code null} if no concurrent request
244   *                                   limiting should be performed for modify
245   *                                   DN operations.
246   * @param  searchSemaphore           The semaphore to use when processing
247   *                                   search operations.  It may be
248   *                                   {@code null} if no concurrent request
249   *                                   limiting should be performed for search
250   *                                   operations.
251   * @param  rejectTimeoutMillis       A timeout value (expressed in
252   *                                   milliseconds) that will cause a requested
253   *                                   operation to be rejected rather than
254   *                                   processed if the associate semaphore
255   *                                   cannot be acquired in this length of
256   *                                   time.  A value of zero indicates that the
257   *                                   operation should be rejected immediately
258   *                                   if the maximum number of concurrent
259   *                                   requests are already in progress.  A
260   *                                   value that is less than zero indicates
261   *                                   that no timeout should be imposed and
262   *                                   that requests should be forced to wait as
263   *                                   long as necessary until they can be
264   *                                   processed.
265   */
266  public ConcurrentRequestLimiterRequestHandler(
267              final LDAPListenerRequestHandler downstreamRequestHandler,
268              final Semaphore abandonSemaphore,
269              final Semaphore addSemaphore,
270              final Semaphore bindSemaphore,
271              final Semaphore compareSemaphore,
272              final Semaphore deleteSemaphore,
273              final Semaphore extendedSemaphore,
274              final Semaphore modifySemaphore,
275              final Semaphore modifyDNSemaphore,
276              final Semaphore searchSemaphore,
277              final long rejectTimeoutMillis)
278  {
279    Validator.ensureNotNull(downstreamRequestHandler);
280
281    this.downstreamRequestHandler = downstreamRequestHandler;
282    this.abandonSemaphore         = abandonSemaphore;
283    this.addSemaphore             = addSemaphore;
284    this.bindSemaphore            = bindSemaphore;
285    this.compareSemaphore         = compareSemaphore;
286    this.deleteSemaphore          = deleteSemaphore;
287    this.extendedSemaphore        = extendedSemaphore;
288    this.modifySemaphore          = modifySemaphore;
289    this.modifyDNSemaphore        = modifyDNSemaphore;
290    this.searchSemaphore          = searchSemaphore;
291
292    if (rejectTimeoutMillis >= 0L)
293    {
294      this.rejectTimeoutMillis = rejectTimeoutMillis;
295    }
296    else
297    {
298      this.rejectTimeoutMillis = (long) Integer.MAX_VALUE;
299    }
300  }
301
302
303
304  /**
305   * {@inheritDoc}
306   */
307  @Override()
308  public ConcurrentRequestLimiterRequestHandler newInstance(
309              final LDAPListenerClientConnection connection)
310         throws LDAPException
311  {
312    return new ConcurrentRequestLimiterRequestHandler(
313         downstreamRequestHandler.newInstance(connection), abandonSemaphore,
314         addSemaphore, bindSemaphore, compareSemaphore, deleteSemaphore,
315         extendedSemaphore, modifySemaphore, modifyDNSemaphore,
316         searchSemaphore, rejectTimeoutMillis);
317  }
318
319
320
321  /**
322   * {@inheritDoc}
323   */
324  @Override()
325  public void processAbandonRequest(final int messageID,
326                                    final AbandonRequestProtocolOp request,
327                                    final List<Control> controls)
328  {
329    try
330    {
331      acquirePermit(abandonSemaphore, OperationType.ABANDON);
332    }
333    catch (final LDAPException le)
334    {
335      Debug.debugException(le);
336      return;
337    }
338
339    try
340    {
341      downstreamRequestHandler.processAbandonRequest(messageID, request,
342           controls);
343    }
344    finally
345    {
346      releasePermit(abandonSemaphore);
347    }
348  }
349
350
351
352  /**
353   * {@inheritDoc}
354   */
355  @Override()
356  public LDAPMessage processAddRequest(final int messageID,
357                                       final AddRequestProtocolOp request,
358                                       final List<Control> controls)
359  {
360    try
361    {
362      acquirePermit(addSemaphore, OperationType.ADD);
363    }
364    catch (final LDAPException le)
365    {
366      Debug.debugException(le);
367      return new LDAPMessage(messageID,
368           new AddResponseProtocolOp(le.toLDAPResult()));
369    }
370
371    try
372    {
373      return downstreamRequestHandler.processAddRequest(messageID, request,
374           controls);
375    }
376    finally
377    {
378      releasePermit(addSemaphore);
379    }
380  }
381
382
383
384  /**
385   * {@inheritDoc}
386   */
387  @Override()
388  public LDAPMessage processBindRequest(final int messageID,
389                                        final BindRequestProtocolOp request,
390                                        final List<Control> controls)
391  {
392    try
393    {
394      acquirePermit(bindSemaphore, OperationType.BIND);
395    }
396    catch (final LDAPException le)
397    {
398      Debug.debugException(le);
399      return new LDAPMessage(messageID,
400           new BindResponseProtocolOp(le.toLDAPResult()));
401    }
402
403    try
404    {
405      return downstreamRequestHandler.processBindRequest(messageID, request,
406           controls);
407    }
408    finally
409    {
410      releasePermit(bindSemaphore);
411    }
412  }
413
414
415
416  /**
417   * {@inheritDoc}
418   */
419  @Override()
420  public LDAPMessage processCompareRequest(final int messageID,
421                          final CompareRequestProtocolOp request,
422                          final List<Control> controls)
423  {
424    try
425    {
426      acquirePermit(compareSemaphore, OperationType.COMPARE);
427    }
428    catch (final LDAPException le)
429    {
430      Debug.debugException(le);
431      return new LDAPMessage(messageID,
432           new CompareResponseProtocolOp(le.toLDAPResult()));
433    }
434
435    try
436    {
437      return downstreamRequestHandler.processCompareRequest(messageID, request,
438           controls);
439    }
440    finally
441    {
442      releasePermit(compareSemaphore);
443    }
444  }
445
446
447
448  /**
449   * {@inheritDoc}
450   */
451  @Override()
452  public LDAPMessage processDeleteRequest(final int messageID,
453                                          final DeleteRequestProtocolOp request,
454                                          final List<Control> controls)
455  {
456    try
457    {
458      acquirePermit(deleteSemaphore, OperationType.DELETE);
459    }
460    catch (final LDAPException le)
461    {
462      Debug.debugException(le);
463      return new LDAPMessage(messageID,
464           new DeleteResponseProtocolOp(le.toLDAPResult()));
465    }
466
467    try
468    {
469      return downstreamRequestHandler.processDeleteRequest(messageID, request,
470           controls);
471    }
472    finally
473    {
474      releasePermit(deleteSemaphore);
475    }
476  }
477
478
479
480  /**
481   * {@inheritDoc}
482   */
483  @Override()
484  public LDAPMessage processExtendedRequest(final int messageID,
485                          final ExtendedRequestProtocolOp request,
486                          final List<Control> controls)
487  {
488    try
489    {
490      acquirePermit(extendedSemaphore, OperationType.EXTENDED);
491    }
492    catch (final LDAPException le)
493    {
494      Debug.debugException(le);
495      return new LDAPMessage(messageID,
496           new ExtendedResponseProtocolOp(le.toLDAPResult()));
497    }
498
499    try
500    {
501      return downstreamRequestHandler.processExtendedRequest(messageID, request,
502           controls);
503    }
504    finally
505    {
506      releasePermit(extendedSemaphore);
507    }
508  }
509
510
511
512  /**
513   * {@inheritDoc}
514   */
515  @Override()
516  public LDAPMessage processModifyRequest(final int messageID,
517                                          final ModifyRequestProtocolOp request,
518                                          final List<Control> controls)
519  {
520    try
521    {
522      acquirePermit(modifySemaphore, OperationType.MODIFY);
523    }
524    catch (final LDAPException le)
525    {
526      Debug.debugException(le);
527      return new LDAPMessage(messageID,
528           new ModifyResponseProtocolOp(le.toLDAPResult()));
529    }
530
531    try
532    {
533      return downstreamRequestHandler.processModifyRequest(messageID, request,
534           controls);
535    }
536    finally
537    {
538      releasePermit(modifySemaphore);
539    }
540  }
541
542
543
544  /**
545   * {@inheritDoc}
546   */
547  @Override()
548  public LDAPMessage processModifyDNRequest(final int messageID,
549                          final ModifyDNRequestProtocolOp request,
550                          final List<Control> controls)
551  {
552    try
553    {
554      acquirePermit(modifyDNSemaphore, OperationType.MODIFY_DN);
555    }
556    catch (final LDAPException le)
557    {
558      Debug.debugException(le);
559      return new LDAPMessage(messageID,
560           new ModifyDNResponseProtocolOp(le.toLDAPResult()));
561    }
562
563    try
564    {
565      return downstreamRequestHandler.processModifyDNRequest(messageID, request,
566           controls);
567    }
568    finally
569    {
570      releasePermit(modifyDNSemaphore);
571    }
572  }
573
574
575
576  /**
577   * {@inheritDoc}
578   */
579  @Override()
580  public LDAPMessage processSearchRequest(final int messageID,
581                                          final SearchRequestProtocolOp request,
582                                          final List<Control> controls)
583  {
584    try
585    {
586      acquirePermit(searchSemaphore, OperationType.SEARCH);
587    }
588    catch (final LDAPException le)
589    {
590      Debug.debugException(le);
591      return new LDAPMessage(messageID,
592           new SearchResultDoneProtocolOp(le.toLDAPResult()));
593    }
594
595    try
596    {
597      return downstreamRequestHandler.processSearchRequest(messageID, request,
598           controls);
599    }
600    finally
601    {
602      releasePermit(searchSemaphore);
603    }
604  }
605
606
607
608  /**
609   * Acquires a permit from the provided semaphore.
610   *
611   * @param  semaphore      The semaphore from which to acquire a permit.  It
612   *                        may be {@code null} if no semaphore is needed for
613   *                        the associated operation type.
614   * @param  operationType  The type of operation
615   *
616   * @throws  LDAPException  If it was not possible to acquire a permit from the
617   *                         provided semaphore.
618   */
619  private void acquirePermit(final Semaphore semaphore,
620                             final OperationType operationType)
621          throws LDAPException
622  {
623    if (semaphore == null)
624    {
625      return;
626    }
627
628    try
629    {
630      if (rejectTimeoutMillis == 0L)
631      {
632        if (! semaphore.tryAcquire())
633        {
634          throw new LDAPException(ResultCode.BUSY,
635               ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_NO_TIMEOUT.get(
636                    operationType.name()));
637        }
638      }
639      else
640      {
641        if (! semaphore.tryAcquire(rejectTimeoutMillis, TimeUnit.MILLISECONDS))
642        {
643          throw new LDAPException(ResultCode.BUSY,
644               ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_TIMEOUT.get(
645                    operationType.name(), rejectTimeoutMillis));
646        }
647      }
648    }
649    catch (final LDAPException le)
650    {
651      throw le;
652    }
653    catch (final Exception e)
654    {
655      Debug.debugException(e);
656      throw new LDAPException(ResultCode.OTHER,
657           ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_SEMAPHORE_EXCEPTION.get(
658                operationType.name(), StaticUtils.getExceptionMessage(e)),
659           e);
660    }
661  }
662
663
664
665  /**
666   * Releases a permit back to the provided semaphore.
667   *
668   * @param  semaphore  The semaphore to which the permit should be released.
669   *                    It may be {@code null} if no semaphore is needed for the
670   *                    associated operation type.
671   */
672  private static void releasePermit(final Semaphore semaphore)
673  {
674    if (semaphore != null)
675    {
676      semaphore.release();
677    }
678  }
679}