001/*
002 * Copyright 2011-2019 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2011-2019 Ping Identity Corporation
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.ByteArrayInputStream;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.InputStream;
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Iterator;
033
034import static com.unboundid.util.UtilityMessages.*;
035
036
037
038/**
039 * This class provides an input stream implementation that can aggregate
040 * multiple input streams.  When reading data from this input stream, it will
041 * read from the first input stream until the end of it is reached, at point it
042 * will close it and start reading from the next one, and so on until all input
043 * streams have been exhausted.  Closing the aggregate input stream will cause
044 * all remaining input streams to be closed.
045 */
046@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
047public final class AggregateInputStream
048       extends InputStream
049{
050  // The currently-active input stream.
051  private volatile InputStream activeInputStream;
052
053  // The iterator that will be used to access the input streams.
054  private final Iterator<InputStream> streamIterator;
055
056
057
058  /**
059   * Creates a new aggregate input stream that will use the provided set of
060   * input streams.
061   *
062   * @param  inputStreams  The input streams to be used by this aggregate input
063   *                       stream.  It must not be {@code null}.
064   */
065  public AggregateInputStream(final InputStream... inputStreams)
066  {
067    this(StaticUtils.toList(inputStreams));
068  }
069
070
071
072  /**
073   * Creates a new aggregate input stream that will use the provided set of
074   * input streams.
075   *
076   * @param  inputStreams  The input streams to be used by this aggregate input
077   *                       stream.  It must not be {@code null}.
078   */
079  public AggregateInputStream(
080              final Collection<? extends InputStream> inputStreams)
081  {
082    Validator.ensureNotNull(inputStreams);
083
084    final ArrayList<InputStream> streamList = new ArrayList<>(inputStreams);
085    streamIterator = streamList.iterator();
086    activeInputStream = null;
087  }
088
089
090
091  /**
092   * Creates a new aggregate input stream that will read data from the specified
093   * files.
094   *
095   * @param  files  The set of files to be read by this aggregate input stream.
096   *                It must not be {@code null}.
097   *
098   * @throws  IOException  If a problem is encountered while attempting to
099   *                       create input streams for the provided files.
100   */
101  public AggregateInputStream(final File... files)
102         throws IOException
103  {
104    this(false, files);
105  }
106
107
108
109  /**
110   * Creates a new aggregate input stream that will read data from the specified
111   * files.
112   *
113   * @param  ensureBlankLinesBetweenFiles  Indicates whether to ensure that
114   *                                       there is at least one completely
115   *                                       blank line between files.  This may
116   *                                       be useful when blank lines are
117   *                                       used as delimiters (for example, when
118   *                                       reading LDIF data), there is a chance
119   *                                       that the files may not end with blank
120   *                                       lines, and the inclusion of extra
121   *                                       blank lines between files will not
122   *                                       cause any harm.
123   * @param  files                         The set of files to be read by this
124   *                                       aggregate input stream.  It must not
125   *                                       be {@code null}.
126   *
127   * @throws  IOException  If a problem is encountered while attempting to
128   *                       create input streams for the provided files.
129   */
130  public AggregateInputStream(final boolean ensureBlankLinesBetweenFiles,
131                              final File... files)
132         throws IOException
133  {
134    Validator.ensureNotNull(files);
135
136    final ArrayList<InputStream> streamList = new ArrayList<>(2 * files.length);
137
138    IOException ioException = null;
139    for (final File f : files)
140    {
141      if (ensureBlankLinesBetweenFiles && (! streamList.isEmpty()))
142      {
143        final ByteStringBuffer buffer = new ByteStringBuffer(4);
144        buffer.append(StaticUtils.EOL_BYTES);
145        buffer.append(StaticUtils.EOL_BYTES);
146        streamList.add(new ByteArrayInputStream(buffer.toByteArray()));
147      }
148
149      try
150      {
151        streamList.add(new FileInputStream(f));
152      }
153      catch (final IOException ioe)
154      {
155        Debug.debugException(ioe);
156        ioException = ioe;
157        break;
158      }
159    }
160
161    if (ioException != null)
162    {
163      for (final InputStream s : streamList)
164      {
165        if (s != null)
166        {
167          try
168          {
169            s.close();
170          }
171          catch (final Exception e)
172          {
173            Debug.debugException(e);
174          }
175        }
176      }
177
178      throw ioException;
179    }
180
181    streamIterator = streamList.iterator();
182    activeInputStream = null;
183  }
184
185
186
187  /**
188   * Reads the next byte of data from the current active input stream, switching
189   * to the next input stream in the set if appropriate.
190   *
191   * @return  The next byte of data that was read, or -1 if all streams have
192   *          been exhausted.
193   *
194   * @throws  IOException  If a problem is encountered while attempting to read
195   *                       data from an input stream.
196   */
197  @Override()
198  public int read()
199         throws IOException
200  {
201    while (true)
202    {
203      if (activeInputStream == null)
204      {
205        if (streamIterator.hasNext())
206        {
207          activeInputStream = streamIterator.next();
208          continue;
209        }
210        else
211        {
212          return -1;
213        }
214      }
215
216      final int byteRead = activeInputStream.read();
217      if (byteRead < 0)
218      {
219        activeInputStream.close();
220        activeInputStream = null;
221      }
222      else
223      {
224        return byteRead;
225      }
226    }
227  }
228
229
230
231  /**
232   * Reads data from the current active input stream into the provided array,
233   * switching to the next input stream in the set if appropriate.
234   *
235   * @param  b  The array into which the data read should be placed, starting
236   *            with an index of zero.  It must not be {@code null}.
237   *
238   * @return  The number of bytes read into the array, or -1 if all streams have
239   *          been exhausted.
240   *
241   * @throws  IOException  If a problem is encountered while attempting to read
242   *                       data from an input stream.
243   */
244  @Override()
245  public int read(final byte[] b)
246         throws IOException
247  {
248    return read(b, 0, b.length);
249  }
250
251
252
253  /**
254   * Reads data from the current active input stream into the provided array,
255   * switching to the next input stream in the set if appropriate.
256   *
257   * @param  b    The array into which the data read should be placed.  It must
258   *              not be {@code null}.
259   * @param  off  The position in the array at which to start writing data.
260   * @param  len  The maximum number of bytes that may be read.
261   *
262   * @return  The number of bytes read into the array, or -1 if all streams have
263   *          been exhausted.
264   *
265   * @throws  IOException  If a problem is encountered while attempting to read
266   *                       data from an input stream.
267   */
268  @Override()
269  public int read(final byte[] b, final int off, final int len)
270         throws IOException
271  {
272    while (true)
273    {
274      if (activeInputStream == null)
275      {
276        if (streamIterator.hasNext())
277        {
278          activeInputStream = streamIterator.next();
279          continue;
280        }
281        else
282        {
283          return -1;
284        }
285      }
286
287      final int bytesRead = activeInputStream.read(b, off, len);
288      if (bytesRead < 0)
289      {
290        activeInputStream.close();
291        activeInputStream = null;
292      }
293      else
294      {
295        return bytesRead;
296      }
297    }
298  }
299
300
301
302  /**
303   * Attempts to skip and discard up to the specified number of bytes from the
304   * input stream.
305   *
306   * @param  n  The number of bytes to attempt to skip.
307   *
308   * @return  The number of bytes actually skipped.
309   *
310   * @throws  IOException  If a problem is encountered while attempting to skip
311   *                       data from the input stream.
312   */
313  @Override()
314  public long skip(final long n)
315         throws IOException
316  {
317    if (activeInputStream == null)
318    {
319      if (streamIterator.hasNext())
320      {
321        activeInputStream = streamIterator.next();
322        return activeInputStream.skip(n);
323      }
324      else
325      {
326        return 0L;
327      }
328    }
329    else
330    {
331      return activeInputStream.skip(n);
332    }
333  }
334
335
336
337  /**
338   * Retrieves an estimate of the number of bytes that can be read without
339   * blocking.
340   *
341   * @return  An estimate of the number of bytes that can be read without
342   *          blocking.
343   *
344   * @throws  IOException  If a problem is encountered while attempting to make
345   *                       the determination.
346   */
347  @Override()
348  public int available()
349         throws IOException
350  {
351    if (activeInputStream == null)
352    {
353      if (streamIterator.hasNext())
354      {
355        activeInputStream = streamIterator.next();
356        return activeInputStream.available();
357      }
358      else
359      {
360        return 0;
361      }
362    }
363    else
364    {
365      return activeInputStream.available();
366    }
367  }
368
369
370
371  /**
372   * Indicates whether this input stream supports the use of the {@code mark}
373   * and {@code reset} methods.  This implementation does not support that
374   * capability.
375   *
376   * @return  {@code false} to indicate that this input stream implementation
377   *          does not support the use of {@code mark} and {@code reset}.
378   */
379  @Override()
380  public boolean markSupported()
381  {
382    return false;
383  }
384
385
386
387  /**
388   * Marks the current position in the input stream.  This input stream does not
389   * support this functionality, so no action will be taken.
390   *
391   * @param  readLimit  The maximum number of bytes that the caller may wish to
392   *                    read before being able to reset the stream.
393   */
394  @Override()
395  public void mark(final int readLimit)
396  {
397    // No implementation is required.
398  }
399
400
401
402  /**
403   * Attempts to reset the position of this input stream to the mark location.
404   * This implementation does not support {@code mark} and {@code reset}
405   * functionality, so this method will always throw an exception.
406   *
407   * @throws  IOException  To indicate that reset is not supported.
408   */
409  @Override()
410  public void reset()
411         throws IOException
412  {
413    throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get());
414  }
415
416
417
418  /**
419   * Closes this input stream.  All associated input streams will be closed.
420   *
421   * @throws  IOException  If an exception was encountered while attempting to
422   *                       close any of the associated streams.  Note that even
423   *                       if an exception is encountered, an attempt will be
424   *                       made to close all streams.
425   */
426  @Override()
427  public void close()
428         throws IOException
429  {
430    IOException firstException = null;
431
432    if (activeInputStream != null)
433    {
434      try
435      {
436        activeInputStream.close();
437      }
438      catch (final IOException ioe)
439      {
440        Debug.debugException(ioe);
441        firstException = ioe;
442      }
443      activeInputStream = null;
444    }
445
446    while (streamIterator.hasNext())
447    {
448      final InputStream s = streamIterator.next();
449      try
450      {
451        s.close();
452      }
453      catch (final IOException ioe)
454      {
455        Debug.debugException(ioe);
456        if (firstException == null)
457        {
458          firstException = ioe;
459        }
460      }
461    }
462
463    if (firstException != null)
464    {
465      throw firstException;
466    }
467  }
468}