001/* 002 * Copyright 2018 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2018 Ping Identity Corporation 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.util; 022 023 024 025import java.io.InputStream; 026import java.io.IOException; 027 028 029 030/** 031 * This class provides an {@code InputStream} implementation that uses a 032 * {@link FixedRateBarrier} to impose an upper bound on the rate (in bytes per 033 * second) at which data can be read from a wrapped {@code InputStream}. 034 */ 035@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 036public final class RateLimitedInputStream 037 extends InputStream 038{ 039 // The fixed-rate barrier that will serve as a rate limiter for this class. 040 private final FixedRateBarrier rateLimiter; 041 042 // The input stream from which the data will actually be read. 043 private final InputStream wrappedStream; 044 045 // The maximum number of bytes that can be read in any single call to the 046 // rate limiter. 047 private final int maxBytesPerRead; 048 049 050 051 /** 052 * Creates a new instance of this rate-limited input stream that wraps the 053 * provided input stream. 054 * 055 * @param wrappedStream The input stream from which the data will 056 * actually be read. It must not be {@code null}. 057 * @param maxBytesPerSecond The maximum number of bytes per second that can 058 * be read using this input stream. It must be 059 * greater than zero. 060 */ 061 public RateLimitedInputStream(final InputStream wrappedStream, 062 final int maxBytesPerSecond) 063 { 064 Validator.ensureTrue((wrappedStream != null), 065 "RateLimitedInputStream.wrappedStream must not be null."); 066 Validator.ensureTrue((maxBytesPerSecond > 0), 067 "RateLimitedInputStream.maxBytesPerSecond must be greater than " + 068 "zero. The provided value was " + maxBytesPerSecond); 069 070 this.wrappedStream = wrappedStream; 071 072 rateLimiter = new FixedRateBarrier(1000L, maxBytesPerSecond); 073 maxBytesPerRead = Math.max(1, (maxBytesPerSecond / 100)); 074 } 075 076 077 078 /** 079 * Closes this input stream and the wrapped stream. 080 * 081 * @throws IOException If a problem is encountered while closing the wrapped 082 * input stream. 083 */ 084 @Override() 085 public void close() 086 throws IOException 087 { 088 wrappedStream.close(); 089 } 090 091 092 093 /** 094 * Reads a single byte of input from the wrapped input stream. 095 * 096 * @return The byte that was read, or -1 if the end of the input stream has 097 * been reached. 098 * 099 * @throws IOException If a problem is encountered while attempting to read 100 * data from the underlying input stream. 101 */ 102 @Override() 103 public int read() 104 throws IOException 105 { 106 rateLimiter.await(); 107 return wrappedStream.read(); 108 } 109 110 111 112 /** 113 * Reads data from the wrapped input stream into the provided array. 114 * 115 * @param b The array into which the data will be placed. 116 * 117 * @return The number of bytes that were read, or -1 if the end of the input 118 * stream has been reached. 119 * 120 * @throws IOException If a problem is encountered while attempting to read 121 * data from the underlying input stream. 122 */ 123 @Override() 124 public int read(final byte[] b) 125 throws IOException 126 { 127 return read(b, 0, b.length); 128 } 129 130 131 132 /** 133 * Reads data from the wrapped input stream into the specified portion of the 134 * provided array. 135 * 136 * @param b The array into which the data will be placed. 137 * @param offset The index into the provided array at which the data should 138 * start being added. 139 * @param length The maximum number of bytes to be added into the array. 140 * 141 * @return The number of bytes that were read, or -1 if the end of the input 142 * stream has been reached. 143 * 144 * @throws IOException If a problem is encountered while attempting to read 145 * data from the underlying input stream. 146 */ 147 @Override() 148 public int read(final byte[] b, final int offset, final int length) 149 throws IOException 150 { 151 if (length <= 0) 152 { 153 return 0; 154 } 155 156 if (length <= maxBytesPerRead) 157 { 158 rateLimiter.await(length); 159 return wrappedStream.read(b, offset, length); 160 } 161 else 162 { 163 int pos = offset; 164 int remainingLength = length; 165 int totalBytesRead = 0; 166 while (remainingLength > 0) 167 { 168 final int lengthThisRead = Math.min(remainingLength, maxBytesPerRead); 169 rateLimiter.await(lengthThisRead); 170 final int bytesRead = wrappedStream.read(b, pos, lengthThisRead); 171 if (bytesRead < 0) 172 { 173 break; 174 } 175 176 pos += bytesRead; 177 totalBytesRead += bytesRead; 178 remainingLength -= bytesRead; 179 } 180 181 return totalBytesRead; 182 } 183 } 184 185 186 187 /** 188 * Retrieves the number of bytes that are immediately available to be read, 189 * if the wrapped stream supports this operation. 190 * 191 * @return The number of bytes that are immediately available to be read, or 192 * zero if there are no bytes to be read, if the end of the input 193 * stream has been reached, or if the wrapped input stream does not 194 * support this operation. 195 */ 196 @Override() 197 public int available() 198 throws IOException 199 { 200 return wrappedStream.available(); 201 } 202 203 204 205 /** 206 * Indicates whether this {@code InputStream} implementation supports the use 207 * of the {@link #mark(int)} and {@link #reset()} methods. This 208 * implementation will support those methods if the wrapped stream supports 209 * them. 210 * 211 * @return {@code true} if this {@code InputStream} supports the 212 * {@code mark} and {@code reset} methods, or {@code false} if not. 213 */ 214 @Override() 215 public boolean markSupported() 216 { 217 return wrappedStream.markSupported(); 218 } 219 220 221 222 /** 223 * Attempts to mark the current position in the wrapped input stream so that 224 * it can optionally be reset after some amount of data has been read. 225 * fun 226 * 227 * @param readLimit The maximum number of bytes expected to be read before a 228 * call to the {@link #reset()} method before the mark will 229 * no longer be honored. 230 */ 231 @Override() 232 public void mark(final int readLimit) 233 { 234 wrappedStream.mark(readLimit); 235 } 236 237 238 239 /** 240 * Attempts to reset the position of this input stream to the last mark 241 * position. 242 * 243 * @throws IOException If the input stream cannot be repositioned to the 244 * marked location, or if no mark has been set. 245 */ 246 @Override() 247 public void reset() 248 throws IOException 249 { 250 wrappedStream.reset(); 251 } 252}