001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017package org.apache.commons.compress.archivers.zip;
018
019
020import java.io.Closeable;
021import java.io.File;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.file.Path;
026import java.util.Iterator;
027import java.util.Queue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.zip.Deflater;
031
032import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
033import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
034import org.apache.commons.compress.utils.BoundedInputStream;
035
036/**
037 * A ZIP output stream that is optimized for multi-threaded scatter/gather construction of ZIP files.
038 * <p>
039 * The internal data format of the entries used by this class are entirely private to this class and are not part of any public api whatsoever.
040 * </p>
041 * <p>
042 * It is possible to extend this class to support different kinds of backing storage, the default implementation only supports file-based backing.
043 * </p>
044 * <p>
045 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called by the thread that originally created the
046 * {@link ZipArchiveEntry}.
047 * </p>
048 *
049 * @since 1.10
050 */
051public class ScatterZipOutputStream implements Closeable {
052
053    private static class CompressedEntry {
054        final ZipArchiveEntryRequest zipArchiveEntryRequest;
055        final long crc;
056        final long compressedSize;
057        final long size;
058
059        public CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
060            this.zipArchiveEntryRequest = zipArchiveEntryRequest;
061            this.crc = crc;
062            this.compressedSize = compressedSize;
063            this.size = size;
064        }
065
066        /**
067         * Updates the original {@link ZipArchiveEntry} with sizes/crc.
068         * Do not use this method from threads that did not create the instance itself!
069         * @return the zipArchiveEntry that is the basis for this request.
070         */
071
072        public ZipArchiveEntry transferToArchiveEntry() {
073            final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
074            entry.setCompressedSize(compressedSize);
075            entry.setSize(size);
076            entry.setCrc(crc);
077            entry.setMethod(zipArchiveEntryRequest.getMethod());
078            return entry;
079        }
080    }
081
082    public static class ZipEntryWriter implements Closeable {
083        private final Iterator<CompressedEntry> itemsIterator;
084        private final InputStream itemsIteratorData;
085
086        public ZipEntryWriter(final ScatterZipOutputStream scatter) throws IOException {
087            scatter.backingStore.closeForWriting();
088            itemsIterator = scatter.items.iterator();
089            itemsIteratorData = scatter.backingStore.getInputStream();
090        }
091
092        @Override
093        public void close() throws IOException {
094            if (itemsIteratorData != null) {
095                itemsIteratorData.close();
096            }
097        }
098
099        public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException {
100            final CompressedEntry compressedEntry = itemsIterator.next();
101            try (final BoundedInputStream rawStream = new BoundedInputStream(itemsIteratorData, compressedEntry.compressedSize)) {
102                target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
103            }
104        }
105    }
106
107    /**
108     * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
109     *
110     * @param file The file to offload compressed data into.
111     * @return A ScatterZipOutputStream that is ready for use.
112     * @throws FileNotFoundException if the file cannot be found
113     */
114    public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
115        return pathBased(file.toPath(), Deflater.DEFAULT_COMPRESSION);
116    }
117
118    /**
119     * Creates a {@link ScatterZipOutputStream} that is backed by a file
120     *
121     * @param file             The file to offload compressed data into.
122     * @param compressionLevel The compression level to use, @see #Deflater
123     * @return A  ScatterZipOutputStream that is ready for use.
124     * @throws FileNotFoundException if the file cannot be found
125     */
126    public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
127        return pathBased(file.toPath(), compressionLevel);
128    }
129
130    /**
131     * Creates a {@link ScatterZipOutputStream} with default compression level that is backed by a file
132     * @param path The path to offload compressed data into.
133     * @return A ScatterZipOutputStream that is ready for use.
134     * @throws FileNotFoundException if the path cannot be found
135     * @since 1.22
136     */
137    public static ScatterZipOutputStream pathBased(final Path path) throws FileNotFoundException {
138        return pathBased(path, Deflater.DEFAULT_COMPRESSION);
139    }
140
141    /**
142     * Creates a {@link ScatterZipOutputStream} that is backed by a file
143     * @param path The path to offload compressed data into.
144     * @param compressionLevel The compression level to use, @see #Deflater
145     * @return A ScatterZipOutputStream that is ready for use.
146     * @throws FileNotFoundException if the path cannot be found
147     * @since 1.22
148     */
149    public static ScatterZipOutputStream pathBased(final Path path, final int compressionLevel) throws FileNotFoundException {
150        final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(path);
151        // lifecycle is bound to the ScatterZipOutputStream returned
152        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR
153        return new ScatterZipOutputStream(bs, sc);
154    }
155
156    private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
157
158    private final ScatterGatherBackingStore backingStore;
159
160    private final StreamCompressor streamCompressor;
161
162    private final AtomicBoolean isClosed = new AtomicBoolean();
163
164    private ZipEntryWriter zipEntryWriter;
165
166    public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore,
167                                  final StreamCompressor streamCompressor) {
168        this.backingStore = backingStore;
169        this.streamCompressor = streamCompressor;
170    }
171
172    /**
173     * Adds an archive entry to this scatter stream.
174     *
175     * @param zipArchiveEntryRequest The entry to write.
176     * @throws IOException    If writing fails
177     */
178    public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
179        try (final InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
180            streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
181        }
182        items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(),
183                                      streamCompressor.getBytesWrittenForLastEntry(), streamCompressor.getBytesRead()));
184    }
185
186    /**
187     * Closes this stream, freeing all resources involved in the creation of this stream.
188     * @throws IOException If closing fails
189     */
190    @Override
191    public void close() throws IOException {
192        if (!isClosed.compareAndSet(false, true)) {
193            return;
194        }
195        try {
196            if (zipEntryWriter != null) {
197                zipEntryWriter.close();
198            }
199            backingStore.close();
200        } finally {
201            streamCompressor.close();
202        }
203    }
204
205    /**
206     * Writes the contents of this scatter stream to a target archive.
207     *
208     * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
209     * @throws IOException If writing fails
210     * @see #zipEntryWriter()
211     */
212    public void writeTo(final ZipArchiveOutputStream target) throws IOException {
213        backingStore.closeForWriting();
214        try (final InputStream data = backingStore.getInputStream()) {
215            for (final CompressedEntry compressedEntry : items) {
216                try (final BoundedInputStream rawStream = new BoundedInputStream(data,
217                        compressedEntry.compressedSize)) {
218                    target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
219                }
220            }
221        }
222    }
223
224    /**
225     * Gets a ZIP entry writer for this scatter stream.
226     * @throws IOException If getting scatter stream input stream
227     * @return the ZipEntryWriter created on first call of the method
228     */
229    public ZipEntryWriter zipEntryWriter() throws IOException {
230        if (zipEntryWriter == null) {
231            zipEntryWriter = new ZipEntryWriter(this);
232        }
233        return zipEntryWriter;
234    }
235}