001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018// import javax.annotation.concurrent.GuardedBy; 019import java.io.EOFException; 020import java.io.FilterInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InterruptedIOException; 024import java.nio.ByteBuffer; 025import java.util.Objects; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.locks.Condition; 031import java.util.concurrent.locks.ReentrantLock; 032 033import org.apache.commons.io.build.AbstractStreamBuilder; 034 035/** 036 * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current 037 * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a 038 * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we 039 * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. 040 * <p> 041 * To build an instance, see {@link Builder}. 042 * </p> 043 * <p> 044 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. 045 * </p> 046 * 047 * @since 2.9.0 048 */ 049public class ReadAheadInputStream extends FilterInputStream { 050 051 /** 052 * Builds a new {@link ReadAheadInputStream} instance. 053 * <p> 054 * For example: 055 * </p> 056 * <pre>{@code 057 * ReadAheadInputStream s = ReadAheadInputStream.builder() 058 * .setPath(path) 059 * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) 060 * .get();} 061 * </pre> 062 * 063 * @since 2.12.0 064 */ 065 public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { 066 067 private ExecutorService executorService; 068 069 /** 070 * Constructs a new instance. 071 * <p> 072 * This builder use the aspects InputStream, OpenOption[], buffer size, ExecutorService. 073 * </p> 074 * <p> 075 * You must provide an origin that can be converted to an InputStream by this builder, otherwise, this call will throw an 076 * {@link UnsupportedOperationException}. 077 * </p> 078 * 079 * @return a new instance. 080 * @throws UnsupportedOperationException if the origin cannot provide an InputStream. 081 * @see #getInputStream() 082 */ 083 @SuppressWarnings("resource") 084 @Override 085 public ReadAheadInputStream get() throws IOException { 086 return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(), 087 executorService == null); 088 } 089 090 /** 091 * Sets the executor service for the read-ahead thread. 092 * 093 * @param executorService the executor service for the read-ahead thread. 094 * @return this 095 */ 096 public Builder setExecutorService(final ExecutorService executorService) { 097 this.executorService = executorService; 098 return this; 099 } 100 101 } 102 103 private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); 104 105 /** 106 * Constructs a new {@link Builder}. 107 * 108 * @return a new {@link Builder}. 109 * @since 2.12.0 110 */ 111 public static Builder builder() { 112 return new Builder(); 113 } 114 115 /** 116 * Constructs a new daemon thread. 117 * 118 * @param r the thread's runnable. 119 * @return a new daemon thread. 120 */ 121 private static Thread newDaemonThread(final Runnable r) { 122 final Thread thread = new Thread(r, "commons-io-read-ahead"); 123 thread.setDaemon(true); 124 return thread; 125 } 126 127 /** 128 * Constructs a new daemon executor service. 129 * 130 * @return a new daemon executor service. 131 */ 132 private static ExecutorService newExecutorService() { 133 return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); 134 } 135 136 private final ReentrantLock stateChangeLock = new ReentrantLock(); 137 138 // @GuardedBy("stateChangeLock") 139 private ByteBuffer activeBuffer; 140 141 // @GuardedBy("stateChangeLock") 142 private ByteBuffer readAheadBuffer; 143 144 // @GuardedBy("stateChangeLock") 145 private boolean endOfStream; 146 147 // @GuardedBy("stateChangeLock") 148 // true if async read is in progress 149 private boolean readInProgress; 150 151 // @GuardedBy("stateChangeLock") 152 // true if read is aborted due to an exception in reading from underlying input stream. 153 private boolean readAborted; 154 155 // @GuardedBy("stateChangeLock") 156 private Throwable readException; 157 158 // @GuardedBy("stateChangeLock") 159 // whether the close method is called. 160 private boolean isClosed; 161 162 // @GuardedBy("stateChangeLock") 163 // true when the close method will close the underlying input stream. This is valid only if 164 // `isClosed` is true. 165 private boolean isUnderlyingInputStreamBeingClosed; 166 167 // @GuardedBy("stateChangeLock") 168 // whether there is a read ahead task running, 169 private boolean isReading; 170 171 // Whether there is a reader waiting for data. 172 private final AtomicBoolean isWaiting = new AtomicBoolean(false); 173 174 private final ExecutorService executorService; 175 176 private final boolean shutdownExecutorService; 177 178 private final Condition asyncReadComplete = stateChangeLock.newCondition(); 179 180 /** 181 * Constructs an instance with the specified buffer size and read-ahead threshold 182 * 183 * @param inputStream The underlying input stream. 184 * @param bufferSizeInBytes The buffer size. 185 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 186 */ 187 @Deprecated 188 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { 189 this(inputStream, bufferSizeInBytes, newExecutorService(), true); 190 } 191 192 /** 193 * Constructs an instance with the specified buffer size and read-ahead threshold 194 * 195 * @param inputStream The underlying input stream. 196 * @param bufferSizeInBytes The buffer size. 197 * @param executorService An executor service for the read-ahead thread. 198 * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} 199 */ 200 @Deprecated 201 public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { 202 this(inputStream, bufferSizeInBytes, executorService, false); 203 } 204 205 /** 206 * Constructs an instance with the specified buffer size and read-ahead threshold 207 * 208 * @param inputStream The underlying input stream. 209 * @param bufferSizeInBytes The buffer size. 210 * @param executorService An executor service for the read-ahead thread. 211 * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. 212 */ 213 private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, 214 final boolean shutdownExecutorService) { 215 super(Objects.requireNonNull(inputStream, "inputStream")); 216 if (bufferSizeInBytes <= 0) { 217 throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); 218 } 219 this.executorService = Objects.requireNonNull(executorService, "executorService"); 220 this.shutdownExecutorService = shutdownExecutorService; 221 this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); 222 this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); 223 this.activeBuffer.flip(); 224 this.readAheadBuffer.flip(); 225 } 226 227 @Override 228 public int available() throws IOException { 229 stateChangeLock.lock(); 230 // Make sure we have no integer overflow. 231 try { 232 return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); 233 } finally { 234 stateChangeLock.unlock(); 235 } 236 } 237 238 private void checkReadException() throws IOException { 239 if (readAborted) { 240 if (readException instanceof IOException) { 241 throw (IOException) readException; 242 } 243 throw new IOException(readException); 244 } 245 } 246 247 @Override 248 public void close() throws IOException { 249 boolean isSafeToCloseUnderlyingInputStream = false; 250 stateChangeLock.lock(); 251 try { 252 if (isClosed) { 253 return; 254 } 255 isClosed = true; 256 if (!isReading) { 257 // Nobody is reading, so we can close the underlying input stream in this method. 258 isSafeToCloseUnderlyingInputStream = true; 259 // Flip this to make sure the read ahead task will not close the underlying input stream. 260 isUnderlyingInputStreamBeingClosed = true; 261 } 262 } finally { 263 stateChangeLock.unlock(); 264 } 265 266 if (shutdownExecutorService) { 267 try { 268 executorService.shutdownNow(); 269 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 270 } catch (final InterruptedException e) { 271 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 272 iio.initCause(e); 273 throw iio; 274 } finally { 275 if (isSafeToCloseUnderlyingInputStream) { 276 super.close(); 277 } 278 } 279 } 280 } 281 282 private void closeUnderlyingInputStreamIfNecessary() { 283 boolean needToCloseUnderlyingInputStream = false; 284 stateChangeLock.lock(); 285 try { 286 isReading = false; 287 if (isClosed && !isUnderlyingInputStreamBeingClosed) { 288 // close method cannot close underlyingInputStream because we were reading. 289 needToCloseUnderlyingInputStream = true; 290 } 291 } finally { 292 stateChangeLock.unlock(); 293 } 294 if (needToCloseUnderlyingInputStream) { 295 try { 296 super.close(); 297 } catch (final IOException ignored) { 298 // TODO Rethrow as UncheckedIOException? 299 } 300 } 301 } 302 303 private boolean isEndOfStream() { 304 return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; 305 } 306 307 @Override 308 public int read() throws IOException { 309 if (activeBuffer.hasRemaining()) { 310 // short path - just get one byte. 311 return activeBuffer.get() & 0xFF; 312 } 313 final byte[] oneByteArray = BYTE_ARRAY_1.get(); 314 oneByteArray[0] = 0; 315 return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; 316 } 317 318 @Override 319 public int read(final byte[] b, final int offset, int len) throws IOException { 320 if (offset < 0 || len < 0 || len > b.length - offset) { 321 throw new IndexOutOfBoundsException(); 322 } 323 if (len == 0) { 324 return 0; 325 } 326 327 if (!activeBuffer.hasRemaining()) { 328 // No remaining in active buffer - lock and switch to write ahead buffer. 329 stateChangeLock.lock(); 330 try { 331 waitForAsyncReadComplete(); 332 if (!readAheadBuffer.hasRemaining()) { 333 // The first read. 334 readAsync(); 335 waitForAsyncReadComplete(); 336 if (isEndOfStream()) { 337 return EOF; 338 } 339 } 340 // Swap the newly read ahead buffer in place of empty active buffer. 341 swapBuffers(); 342 // After swapping buffers, trigger another async read for read ahead buffer. 343 readAsync(); 344 } finally { 345 stateChangeLock.unlock(); 346 } 347 } 348 len = Math.min(len, activeBuffer.remaining()); 349 activeBuffer.get(b, offset, len); 350 351 return len; 352 } 353 354 /** 355 * Read data from underlyingInputStream to readAheadBuffer asynchronously. 356 * 357 * @throws IOException if an I/O error occurs. 358 */ 359 private void readAsync() throws IOException { 360 stateChangeLock.lock(); 361 final byte[] arr; 362 try { 363 arr = readAheadBuffer.array(); 364 if (endOfStream || readInProgress) { 365 return; 366 } 367 checkReadException(); 368 readAheadBuffer.position(0); 369 readAheadBuffer.flip(); 370 readInProgress = true; 371 } finally { 372 stateChangeLock.unlock(); 373 } 374 executorService.execute(() -> { 375 stateChangeLock.lock(); 376 try { 377 if (isClosed) { 378 readInProgress = false; 379 return; 380 } 381 // Flip this so that the close method will not close the underlying input stream when we 382 // are reading. 383 isReading = true; 384 } finally { 385 stateChangeLock.unlock(); 386 } 387 388 // Please note that it is safe to release the lock and read into the read ahead buffer 389 // because either of following two conditions will hold: 390 // 391 // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. 392 // 393 // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits 394 // for this async read to complete. 395 // 396 // So there is no race condition in both the situations. 397 int read = 0; 398 int off = 0, len = arr.length; 399 Throwable exception = null; 400 try { 401 // try to fill the read ahead buffer. 402 // if a reader is waiting, possibly return early. 403 do { 404 read = in.read(arr, off, len); 405 if (read <= 0) { 406 break; 407 } 408 off += read; 409 len -= read; 410 } while (len > 0 && !isWaiting.get()); 411 } catch (final Throwable ex) { 412 exception = ex; 413 if (ex instanceof Error) { 414 // `readException` may not be reported to the user. Rethrow Error to make sure at least 415 // The user can see Error in UncaughtExceptionHandler. 416 throw (Error) ex; 417 } 418 } finally { 419 stateChangeLock.lock(); 420 try { 421 readAheadBuffer.limit(off); 422 if (read < 0 || exception instanceof EOFException) { 423 endOfStream = true; 424 } else if (exception != null) { 425 readAborted = true; 426 readException = exception; 427 } 428 readInProgress = false; 429 signalAsyncReadComplete(); 430 } finally { 431 stateChangeLock.unlock(); 432 } 433 closeUnderlyingInputStreamIfNecessary(); 434 } 435 }); 436 } 437 438 private void signalAsyncReadComplete() { 439 stateChangeLock.lock(); 440 try { 441 asyncReadComplete.signalAll(); 442 } finally { 443 stateChangeLock.unlock(); 444 } 445 } 446 447 @Override 448 public long skip(final long n) throws IOException { 449 if (n <= 0L) { 450 return 0L; 451 } 452 if (n <= activeBuffer.remaining()) { 453 // Only skipping from active buffer is sufficient 454 activeBuffer.position((int) n + activeBuffer.position()); 455 return n; 456 } 457 stateChangeLock.lock(); 458 final long skipped; 459 try { 460 skipped = skipInternal(n); 461 } finally { 462 stateChangeLock.unlock(); 463 } 464 return skipped; 465 } 466 467 /** 468 * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before 469 * calling this function. 470 * 471 * @param n the number of bytes to be skipped. 472 * @return the actual number of bytes skipped. 473 * @throws IOException if an I/O error occurs. 474 */ 475 private long skipInternal(final long n) throws IOException { 476 assert stateChangeLock.isLocked(); 477 waitForAsyncReadComplete(); 478 if (isEndOfStream()) { 479 return 0; 480 } 481 if (available() >= n) { 482 // we can skip from the internal buffers 483 int toSkip = (int) n; 484 // We need to skip from both active buffer and read ahead buffer 485 toSkip -= activeBuffer.remaining(); 486 assert toSkip > 0; // skipping from activeBuffer already handled. 487 activeBuffer.position(0); 488 activeBuffer.flip(); 489 readAheadBuffer.position(toSkip + readAheadBuffer.position()); 490 swapBuffers(); 491 // Trigger async read to emptied read ahead buffer. 492 readAsync(); 493 return n; 494 } 495 final int skippedBytes = available(); 496 final long toSkip = n - skippedBytes; 497 activeBuffer.position(0); 498 activeBuffer.flip(); 499 readAheadBuffer.position(0); 500 readAheadBuffer.flip(); 501 final long skippedFromInputStream = in.skip(toSkip); 502 readAsync(); 503 return skippedBytes + skippedFromInputStream; 504 } 505 506 /** 507 * Flips the active and read ahead buffer 508 */ 509 private void swapBuffers() { 510 final ByteBuffer temp = activeBuffer; 511 activeBuffer = readAheadBuffer; 512 readAheadBuffer = temp; 513 } 514 515 private void waitForAsyncReadComplete() throws IOException { 516 stateChangeLock.lock(); 517 try { 518 isWaiting.set(true); 519 // There is only one reader, and one writer, so the writer should signal only once, 520 // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. 521 while (readInProgress) { 522 asyncReadComplete.await(); 523 } 524 } catch (final InterruptedException e) { 525 final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); 526 iio.initCause(e); 527 throw iio; 528 } finally { 529 try { 530 isWaiting.set(false); 531 } finally { 532 stateChangeLock.unlock(); 533 } 534 } 535 checkReadException(); 536 } 537}