001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.compress.archivers.zip; 018 019import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; 020 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.util.Deque; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ConcurrentLinkedDeque; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.zip.Deflater; 032 033import org.apache.commons.compress.parallel.InputStreamSupplier; 034import org.apache.commons.compress.parallel.ScatterGatherBackingStore; 035import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; 036 037/** 038 * Creates a ZIP in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. 039 * <p> 040 * Note that until 1.18, this class generally made no guarantees about the order of things written to the output file. Things that needed to come in a specific 041 * order (manifests, directories) had to be handled by the client of this class, usually by writing these things to the {@link ZipArchiveOutputStream} 042 * <em>before</em> calling {@link #writeTo writeTo} on this class. 043 * </p> 044 * <p> 045 * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of memory model consistency, this will be shut down by this class 046 * prior to completion. 047 * </p> 048 * 049 * @since 1.10 050 */ 051public class ParallelScatterZipCreator { 052 053 private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>(); 054 private final ExecutorService executorService; 055 private final ScatterGatherBackingStoreSupplier backingStoreSupplier; 056 057 private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>(); 058 private final long startedAt = System.currentTimeMillis(); 059 private long compressionDoneAt; 060 private long scatterDoneAt; 061 062 private final int compressionLevel; 063 064 private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() { 065 @Override 066 protected ScatterZipOutputStream initialValue() { 067 try { 068 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); 069 streams.add(scatterStream); 070 return scatterStream; 071 } catch (final IOException e) { 072 throw new UncheckedIOException(e); //NOSONAR 073 } 074 } 075 }; 076 077 /** 078 * Constructs a ParallelScatterZipCreator with default threads, which is set to the number of available 079 * processors, as defined by {@link java.lang.Runtime#availableProcessors} 080 */ 081 public ParallelScatterZipCreator() { 082 this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); 083 } 084 085 /** 086 * Constructs a ParallelScatterZipCreator 087 * 088 * @param executorService The executorService to use for parallel scheduling. For technical reasons, 089 * this will be shut down by this class. 090 */ 091 public ParallelScatterZipCreator(final ExecutorService executorService) { 092 this(executorService, new DefaultBackingStoreSupplier(null)); 093 } 094 095 /** 096 * Constructs a ParallelScatterZipCreator 097 * 098 * @param executorService The executorService to use. For technical reasons, this will be shut down 099 * by this class. 100 * @param backingStoreSupplier The supplier of backing store which shall be used 101 */ 102 public ParallelScatterZipCreator(final ExecutorService executorService, 103 final ScatterGatherBackingStoreSupplier backingStoreSupplier) { 104 this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION); 105 } 106 107 /** 108 * Constructs a ParallelScatterZipCreator 109 * 110 * @param executorService The executorService to use. For technical reasons, this will be shut down 111 * by this class. 112 * @param backingStoreSupplier The supplier of backing store which shall be used 113 * @param compressionLevel The compression level used in compression, this value should be 114 * -1(default level) or between 0~9. 115 * @throws IllegalArgumentException if the compression level is illegal 116 * @since 1.21 117 */ 118 public ParallelScatterZipCreator(final ExecutorService executorService, 119 final ScatterGatherBackingStoreSupplier backingStoreSupplier, 120 final int compressionLevel) throws IllegalArgumentException { 121 if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) 122 && compressionLevel != Deflater.DEFAULT_COMPRESSION) { 123 throw new IllegalArgumentException("Compression level is expected between -1~9"); 124 } 125 126 this.backingStoreSupplier = backingStoreSupplier; 127 this.executorService = executorService; 128 this.compressionLevel = compressionLevel; 129 } 130 131 /** 132 * Adds an archive entry to this archive. 133 * <p> 134 * This method is expected to be called from a single client thread 135 * </p> 136 * 137 * @param zipArchiveEntry The entry to add. 138 * @param source The source input stream supplier 139 */ 140 141 public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { 142 submitStreamAwareCallable(createCallable(zipArchiveEntry, source)); 143 } 144 145 /** 146 * Adds an archive entry to this archive. 147 * <p> 148 * This method is expected to be called from a single client thread 149 * </p> 150 * 151 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 152 * @since 1.13 153 */ 154 public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 155 submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier)); 156 } 157 158 private void closeAll() { 159 for (final ScatterZipOutputStream scatterStream : streams) { 160 try { 161 scatterStream.close(); 162 } catch (final IOException ex) { //NOSONAR 163 // no way to properly log this 164 } 165 } 166 } 167 168 /** 169 * Creates a callable that will compress the given archive entry. 170 * 171 * <p>This method is expected to be called from a single client thread.</p> 172 * 173 * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}. 174 * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a 175 * client is if you want to wrap the callable in something that can be prioritized by the supplied 176 * {@link ExecutorService}, for instance to process large or slow files first. 177 * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client. 178 * 179 * @param zipArchiveEntry The entry to add. 180 * @param source The source input stream supplier 181 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 182 * value of this callable is not used, but any exceptions happening inside the compression 183 * will be propagated through the callable. 184 */ 185 186 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, 187 final InputStreamSupplier source) { 188 final int method = zipArchiveEntry.getMethod(); 189 if (method == ZipMethod.UNKNOWN_CODE) { 190 throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); 191 } 192 final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); 193 return () -> { 194 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 195 scatterStream.addArchiveEntry(zipArchiveEntryRequest); 196 return scatterStream; 197 }; 198 } 199 200 /** 201 * Creates a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}. 202 * 203 * <p>This method is expected to be called from a single client thread.</p> 204 * 205 * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry 206 * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}. 207 * 208 * @see #createCallable(ZipArchiveEntry, InputStreamSupplier) 209 * 210 * @param zipArchiveEntryRequestSupplier Should supply the entry to be added. 211 * @return A callable that should subsequently be passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The 212 * value of this callable is not used, but any exceptions happening inside the compression 213 * will be propagated through the callable. 214 * @since 1.13 215 */ 216 public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { 217 return () -> { 218 final ScatterZipOutputStream scatterStream = tlScatterStreams.get(); 219 scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get()); 220 return scatterStream; 221 }; 222 } 223 224 private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier) 225 throws IOException { 226 final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); 227 // lifecycle is bound to the ScatterZipOutputStream returned 228 final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR 229 return new ScatterZipOutputStream(bs, sc); 230 } 231 232 /** 233 * Gets a message describing the overall statistics of the compression run 234 * 235 * @return A string 236 */ 237 public ScatterStatistics getStatisticsMessage() { 238 return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt); 239 } 240 241 /** 242 * Submits a callable for compression. 243 * 244 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 245 * 246 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 247 */ 248 public final void submit(final Callable<? extends Object> callable) { 249 submitStreamAwareCallable(() -> { 250 callable.call(); 251 return tlScatterStreams.get(); 252 }); 253 } 254 255 /** 256 * Submits a callable for compression. 257 * 258 * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. 259 * 260 * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. 261 * @since 1.19 262 */ 263 public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) { 264 futures.add(executorService.submit(callable)); 265 } 266 267 /** 268 * Writes the contents this to the target {@link ZipArchiveOutputStream}. 269 * <p> 270 * It may be beneficial to write things like directories and manifest files to the targetStream before calling this method. 271 * </p> 272 * <p> 273 * Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link Callable}s {@link #submitStreamAwareCallable 274 * submit}ted to this instance throws an exception, the archive can not be created properly and this method will throw an exception. 275 * </p> 276 * 277 * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams 278 * @throws IOException If writing fails 279 * @throws InterruptedException If we get interrupted 280 * @throws ExecutionException If something happens in the parallel execution 281 */ 282 public void writeTo(final ZipArchiveOutputStream targetStream) 283 throws IOException, InterruptedException, ExecutionException { 284 285 try { 286 // Make sure we catch any exceptions from parallel phase 287 try { 288 for (final Future<?> future : futures) { 289 future.get(); 290 } 291 } finally { 292 executorService.shutdown(); 293 } 294 295 executorService.awaitTermination(1000 * 60L, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete 296 297 // It is important that all threads terminate before we go on, ensure happens-before relationship 298 compressionDoneAt = System.currentTimeMillis(); 299 300 for (final Future<? extends ScatterZipOutputStream> future : futures) { 301 final ScatterZipOutputStream scatterStream = future.get(); 302 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream); 303 } 304 305 for (final ScatterZipOutputStream scatterStream : streams) { 306 scatterStream.close(); 307 } 308 309 scatterDoneAt = System.currentTimeMillis(); 310 } finally { 311 closeAll(); 312 } 313 } 314} 315