001/* 002 * Copyright 2011-2018 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2011-2018 Ping Identity Corporation 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.util; 022 023 024 025import java.io.ByteArrayInputStream; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.InputStream; 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.Iterator; 033 034import static com.unboundid.util.UtilityMessages.*; 035 036 037 038/** 039 * This class provides an input stream implementation that can aggregate 040 * multiple input streams. When reading data from this input stream, it will 041 * read from the first input stream until the end of it is reached, at point it 042 * will close it and start reading from the next one, and so on until all input 043 * streams have been exhausted. Closing the aggregate input stream will cause 044 * all remaining input streams to be closed. 045 */ 046@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 047public final class AggregateInputStream 048 extends InputStream 049{ 050 // The currently-active input stream. 051 private volatile InputStream activeInputStream; 052 053 // The iterator that will be used to access the input streams. 054 private final Iterator<InputStream> streamIterator; 055 056 057 058 /** 059 * Creates a new aggregate input stream that will use the provided set of 060 * input streams. 061 * 062 * @param inputStreams The input streams to be used by this aggregate input 063 * stream. It must not be {@code null}. 064 */ 065 public AggregateInputStream(final InputStream... inputStreams) 066 { 067 this(StaticUtils.toList(inputStreams)); 068 } 069 070 071 072 /** 073 * Creates a new aggregate input stream that will use the provided set of 074 * input streams. 075 * 076 * @param inputStreams The input streams to be used by this aggregate input 077 * stream. It must not be {@code null}. 078 */ 079 public AggregateInputStream( 080 final Collection<? extends InputStream> inputStreams) 081 { 082 Validator.ensureNotNull(inputStreams); 083 084 final ArrayList<InputStream> streamList = 085 new ArrayList<InputStream>(inputStreams); 086 streamIterator = streamList.iterator(); 087 activeInputStream = null; 088 } 089 090 091 092 /** 093 * Creates a new aggregate input stream that will read data from the specified 094 * files. 095 * 096 * @param files The set of files to be read by this aggregate input stream. 097 * It must not be {@code null}. 098 * 099 * @throws IOException If a problem is encountered while attempting to 100 * create input streams for the provided files. 101 */ 102 public AggregateInputStream(final File... files) 103 throws IOException 104 { 105 this(false, files); 106 } 107 108 109 110 /** 111 * Creates a new aggregate input stream that will read data from the specified 112 * files. 113 * 114 * @param ensureBlankLinesBetweenFiles Indicates whether to ensure that 115 * there is at least one completely 116 * blank line between files. This may 117 * be useful when blank lines are 118 * used as delimiters (for example, when 119 * reading LDIF data), there is a chance 120 * that the files may not end with blank 121 * lines, and the inclusion of extra 122 * blank lines between files will not 123 * cause any harm. 124 * @param files The set of files to be read by this 125 * aggregate input stream. It must not 126 * be {@code null}. 127 * 128 * @throws IOException If a problem is encountered while attempting to 129 * create input streams for the provided files. 130 */ 131 public AggregateInputStream(final boolean ensureBlankLinesBetweenFiles, 132 final File... files) 133 throws IOException 134 { 135 Validator.ensureNotNull(files); 136 137 final ArrayList<InputStream> streamList = new ArrayList<>(2 * files.length); 138 139 IOException ioException = null; 140 for (final File f : files) 141 { 142 if (ensureBlankLinesBetweenFiles && (! streamList.isEmpty())) 143 { 144 final ByteStringBuffer buffer = new ByteStringBuffer(4); 145 buffer.append(StaticUtils.EOL_BYTES); 146 buffer.append(StaticUtils.EOL_BYTES); 147 streamList.add(new ByteArrayInputStream(buffer.toByteArray())); 148 } 149 150 try 151 { 152 streamList.add(new FileInputStream(f)); 153 } 154 catch (final IOException ioe) 155 { 156 Debug.debugException(ioe); 157 ioException = ioe; 158 break; 159 } 160 } 161 162 if (ioException != null) 163 { 164 for (final InputStream s : streamList) 165 { 166 if (s != null) 167 { 168 try 169 { 170 s.close(); 171 } 172 catch (final Exception e) 173 { 174 Debug.debugException(e); 175 } 176 } 177 } 178 179 throw ioException; 180 } 181 182 streamIterator = streamList.iterator(); 183 activeInputStream = null; 184 } 185 186 187 188 /** 189 * Reads the next byte of data from the current active input stream, switching 190 * to the next input stream in the set if appropriate. 191 * 192 * @return The next byte of data that was read, or -1 if all streams have 193 * been exhausted. 194 * 195 * @throws IOException If a problem is encountered while attempting to read 196 * data from an input stream. 197 */ 198 @Override() 199 public int read() 200 throws IOException 201 { 202 while (true) 203 { 204 if (activeInputStream == null) 205 { 206 if (streamIterator.hasNext()) 207 { 208 activeInputStream = streamIterator.next(); 209 continue; 210 } 211 else 212 { 213 return -1; 214 } 215 } 216 217 final int byteRead = activeInputStream.read(); 218 if (byteRead < 0) 219 { 220 activeInputStream.close(); 221 activeInputStream = null; 222 } 223 else 224 { 225 return byteRead; 226 } 227 } 228 } 229 230 231 232 /** 233 * Reads data from the current active input stream into the provided array, 234 * switching to the next input stream in the set if appropriate. 235 * 236 * @param b The array into which the data read should be placed, starting 237 * with an index of zero. It must not be {@code null}. 238 * 239 * @return The number of bytes read into the array, or -1 if all streams have 240 * been exhausted. 241 * 242 * @throws IOException If a problem is encountered while attempting to read 243 * data from an input stream. 244 */ 245 @Override() 246 public int read(final byte[] b) 247 throws IOException 248 { 249 return read(b, 0, b.length); 250 } 251 252 253 254 /** 255 * Reads data from the current active input stream into the provided array, 256 * switching to the next input stream in the set if appropriate. 257 * 258 * @param b The array into which the data read should be placed. It must 259 * not be {@code null}. 260 * @param off The position in the array at which to start writing data. 261 * @param len The maximum number of bytes that may be read. 262 * 263 * @return The number of bytes read into the array, or -1 if all streams have 264 * been exhausted. 265 * 266 * @throws IOException If a problem is encountered while attempting to read 267 * data from an input stream. 268 */ 269 @Override() 270 public int read(final byte[] b, final int off, final int len) 271 throws IOException 272 { 273 while (true) 274 { 275 if (activeInputStream == null) 276 { 277 if (streamIterator.hasNext()) 278 { 279 activeInputStream = streamIterator.next(); 280 continue; 281 } 282 else 283 { 284 return -1; 285 } 286 } 287 288 final int bytesRead = activeInputStream.read(b, off, len); 289 if (bytesRead < 0) 290 { 291 activeInputStream.close(); 292 activeInputStream = null; 293 } 294 else 295 { 296 return bytesRead; 297 } 298 } 299 } 300 301 302 303 /** 304 * Attempts to skip and discard up to the specified number of bytes from the 305 * input stream. 306 * 307 * @param n The number of bytes to attempt to skip. 308 * 309 * @return The number of bytes actually skipped. 310 * 311 * @throws IOException If a problem is encountered while attempting to skip 312 * data from the input stream. 313 */ 314 @Override() 315 public long skip(final long n) 316 throws IOException 317 { 318 if (activeInputStream == null) 319 { 320 if (streamIterator.hasNext()) 321 { 322 activeInputStream = streamIterator.next(); 323 return activeInputStream.skip(n); 324 } 325 else 326 { 327 return 0L; 328 } 329 } 330 else 331 { 332 return activeInputStream.skip(n); 333 } 334 } 335 336 337 338 /** 339 * Retrieves an estimate of the number of bytes that can be read without 340 * blocking. 341 * 342 * @return An estimate of the number of bytes that can be read without 343 * blocking. 344 * 345 * @throws IOException If a problem is encountered while attempting to make 346 * the determination. 347 */ 348 @Override() 349 public int available() 350 throws IOException 351 { 352 if (activeInputStream == null) 353 { 354 if (streamIterator.hasNext()) 355 { 356 activeInputStream = streamIterator.next(); 357 return activeInputStream.available(); 358 } 359 else 360 { 361 return 0; 362 } 363 } 364 else 365 { 366 return activeInputStream.available(); 367 } 368 } 369 370 371 372 /** 373 * Indicates whether this input stream supports the use of the {@code mark} 374 * and {@code reset} methods. This implementation does not support that 375 * capability. 376 * 377 * @return {@code false} to indicate that this input stream implementation 378 * does not support the use of {@code mark} and {@code reset}. 379 */ 380 @Override() 381 public boolean markSupported() 382 { 383 return false; 384 } 385 386 387 388 /** 389 * Marks the current position in the input stream. This input stream does not 390 * support this functionality, so no action will be taken. 391 * 392 * @param readLimit The maximum number of bytes that the caller may wish to 393 * read before being able to reset the stream. 394 */ 395 @Override() 396 public void mark(final int readLimit) 397 { 398 // No implementation is required. 399 } 400 401 402 403 /** 404 * Attempts to reset the position of this input stream to the mark location. 405 * This implementation does not support {@code mark} and {@code reset} 406 * functionality, so this method will always throw an exception. 407 * 408 * @throws IOException To indicate that reset is not supported. 409 */ 410 @Override() 411 public void reset() 412 throws IOException 413 { 414 throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get()); 415 } 416 417 418 419 /** 420 * Closes this input stream. All associated input streams will be closed. 421 * 422 * @throws IOException If an exception was encountered while attempting to 423 * close any of the associated streams. Note that even 424 * if an exception is encountered, an attempt will be 425 * made to close all streams. 426 */ 427 @Override() 428 public void close() 429 throws IOException 430 { 431 IOException firstException = null; 432 433 if (activeInputStream != null) 434 { 435 try 436 { 437 activeInputStream.close(); 438 } 439 catch (final IOException ioe) 440 { 441 Debug.debugException(ioe); 442 firstException = ioe; 443 } 444 activeInputStream = null; 445 } 446 447 while (streamIterator.hasNext()) 448 { 449 final InputStream s = streamIterator.next(); 450 try 451 { 452 s.close(); 453 } 454 catch (final IOException ioe) 455 { 456 Debug.debugException(ioe); 457 if (firstException == null) 458 { 459 firstException = ioe; 460 } 461 } 462 } 463 464 if (firstException != null) 465 { 466 throw firstException; 467 } 468 } 469}