001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.compress.archivers.zip; 018 019import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; 020 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.util.Deque; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ConcurrentLinkedDeque; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.zip.Deflater; 032 033import org.apache.commons.compress.parallel.InputStreamSupplier; 034import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 035import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; 036 037/** 038 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. 039 * <p> 040 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific 041 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream} 042 * <em>before</em> calling {@link #writeTo writeTo} on this class. 043 * </p> 044 * <p> 045 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class 046 * prior to completion. 047 * </p> 048 * 049 * @since 1.10 050 */ 051public class ParallelScatterZipCreator { 052 053 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>(); 054 private final ExecutorService executorService; 055 private final ScatterGatherBackingStoreSupplier backingStoreSupplier; 056 057 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>(); 058 private final long startedAt = System.currentTimeMillis(); 059 private long compressionDoneAt; 060 private long scatterDoneAt; 061 062 private final int compressionLevel; 063 064 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { 065 @Override 066 protected ScatterZipOutputStream initialValue() { 067 try { 068 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); 069 streams.add(scatterStream); 070 return scatterStream; 071 } catch (final IOException e) { 072 throw new UncheckedIOException(e); //NOSONAR 073 } 074 } 075 }; 076 077 /** 078 * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available 079 * processors, as defined by {@link java.lang.Runtime#availableProcessors} 080 */ 081 public ParallelScatterZipCreator() { 082 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 083 } 084 085 /** 086 * Constructs a ParallelScatterZipCreator 087 * 088 * @param executorService The executorService to use for parallel scheduling. For technical reasons, 089 * this will be shut down by this class. 090 */ 091 public ParallelScatterZipCreator(final ExecutorService executorService) { 092 this(executorService, new DefaultBackingStoreSupplier(null)); 093 } 094 095 /** 096 * Constructs a ParallelScatterZipCreator 097 * 098 * @param executorService The executorService to use. For technical reasons, this will be shut down 099 * by this class. 100 * @param backingStoreSupplier The supplier of backing store which shall be used 101 */ 102 public ParallelScatterZipCreator(final ExecutorService executorService, 103 final ScatterGatherBackingStoreSupplier backingStoreSupplier) { 104 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION); 105 } 106 107 /** 108 * Constructs a ParallelScatterZipCreator 109 * 110 * @param executorService The executorService to use. For technical reasons, this will be shut down 111 * by this class. 112 * @param backingStoreSupplier The supplier of backing store which shall be used 113 * @param compressionLevel The compression level used in compression, this value should be 114 * -1(default level) or between 0~9. 115 * @throws IllegalArgumentException if the compression level is illegal 116 * @since 1.21 117 */ 118 public ParallelScatterZipCreator(final ExecutorService executorService, 119 final ScatterGatherBackingStoreSupplier backingStoreSupplier, 120 final int compressionLevel) throws IllegalArgumentException { 121 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) 122 && compressionLevel != Deflater.DEFAULT_COMPRESSION) { 123 throw new IllegalArgumentException("Compression level is expected between -1~9"); 124 } 125 126 this.backingStoreSupplier = backingStoreSupplier; 127 this.executorService = executorService; 128 this.compressionLevel = compressionLevel; 129 } 130 131 /** 132 * Adds an archive entry to this archive. 133 * <p> 134 * This method is expected to be called from a single client thread 135 * </p> 136 * 137 * @param zipArchiveEntry The entry to add. 138 * @param source The source input stream supplier 139 */ 140 141 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 142 submitStreamAwareCallable(createCallable(zipArchiveEntry, source)); 143 } 144 145 /** 146 * Adds an archive entry to this archive. 147 * <p> 148 * This method is expected to be called from a single client thread 149 * </p> 150 * 151 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 152 * @since 1.13 153 */ 154 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 155 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier)); 156 } 157 158 private void closeAll() { 159 for (final ScatterZipOutputStream scatterStream : streams) { 160 try { 161 scatterStream.close(); 162 } catch (final IOException ex) { //NOSONAR 163 // no way to properly log this 164 } 165 } 166 } 167 168 /** 169 * Creates a callable that will compress the given archive entry. 170 * 171 * <p>This method is expected to be called from a single client thread.</p> 172 * 173 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. 174 * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a 175 * client is if you want to wrap the callable in something that can be prioritized by the supplied 176 * {@link ExecutorService}, for instance to process large or slow files first. 177 * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client. 178 * 179 * @param zipArchiveEntry The entry to add. 180 * @param source The source input stream supplier 181 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 182 * value of this callable is not used, but any exceptions happening inside the compression 183 * will be propagated through the callable. 184 */ 185 186 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, 187 final InputStreamSupplier source) { 188 final int method = zipArchiveEntry.getMethod(); 189 if (method == ZipMethod.UNKNOWN_CODE) { 190 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); 191 } 192 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); 193 return () -> { 194 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 195 scatterStream.addArchiveEntry(zipArchiveEntryRequest); 196 return scatterStream; 197 }; 198 } 199 200 /** 201 * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}. 202 * 203 * <p>This method is expected to be called from a single client thread.</p> 204 * 205 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry 206 * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}. 207 * 208 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier) 209 * 210 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 211 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 212 * value of this callable is not used, but any exceptions happening inside the compression 213 * will be propagated through the callable. 214 * @since 1.13 215 */ 216 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 217 return () -> { 218 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 219 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get()); 220 return scatterStream; 221 }; 222 } 223 224 @SuppressWarnings("resource") // Caller closes 225 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) 226 throws IOException { 227 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); 228 // lifecycle is bound to the ScatterZipOutputStream returned 229 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR 230 return new ScatterZipOutputStream(bs, sc); 231 } 232 233 /** 234 * Gets a message describing the overall statistics of the compression run 235 * 236 * @return A string 237 */ 238 public ScatterStatistics getStatisticsMessage() { 239 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt); 240 } 241 242 /** 243 * Submits a callable for compression. 244 * 245 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 246 * 247 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 248 */ 249 public final void submit(final Callable<? extends Object> callable) { 250 submitStreamAwareCallable(() -> { 251 callable.call(); 252 return tlScatterStreams.get(); 253 }); 254 } 255 256 /** 257 * Submits a callable for compression. 258 * 259 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 260 * 261 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 262 * @since 1.19 263 */ 264 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) { 265 futures.add(executorService.submit(callable)); 266 } 267 268 /** 269 * Writes the contents this to the target {@link ZipArchiveOutputStream}. 270 * <p> 271 * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method. 272 * </p> 273 * <p> 274 * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable 275 * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception. 276 * </p> 277 * 278 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams 279 * @throws IOException If writing fails 280 * @throws InterruptedException If we get interrupted 281 * @throws ExecutionException If something happens in the parallel execution 282 */ 283 public void writeTo(final ZipArchiveOutputStream targetStream) 284 throws IOException, InterruptedException, ExecutionException { 285 286 try { 287 // Make sure we catch any exceptions from parallel phase 288 try { 289 for (final Future<?> future : futures) { 290 future.get(); 291 } 292 } finally { 293 executorService.shutdown(); 294 } 295 296 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete 297 298 // It is important that all threads terminate before we go on, ensure happens-before relationship 299 compressionDoneAt = System.currentTimeMillis(); 300 301 for (final Future<? extends ScatterZipOutputStream> future : futures) { 302 final ScatterZipOutputStream scatterStream = future.get(); 303 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream); 304 } 305 306 for (final ScatterZipOutputStream scatterStream : streams) { 307 scatterStream.close(); 308 } 309 310 scatterDoneAt = System.currentTimeMillis(); 311 } finally { 312 closeAll(); 313 } 314 } 315} 316