001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017package org.apache.commons.compress.archivers.zip;
018
019import java.io.Closeable;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.channels.SeekableByteChannel;
026import java.util.zip.CRC32;
027import java.util.zip.Deflater;
028import java.util.zip.ZipEntry;
029
030import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
031
032/**
033 * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams.
034 * Currently {@link java.util.zip.ZipEntry#DEFLATED} and {@link java.util.zip.ZipEntry#STORED} are the only
035 * supported compression methods.
036 *
037 * @since 1.10
038 */
039public abstract class StreamCompressor implements Closeable {
040
041    private static final class DataOutputCompressor extends StreamCompressor {
042        private final DataOutput raf;
043
044        public DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
045            super(deflater);
046            this.raf = raf;
047        }
048
049        @Override
050        protected void writeOut(final byte[] data, final int offset, final int length)
051                throws IOException {
052            raf.write(data, offset, length);
053        }
054    }
055
056    private static final class OutputStreamCompressor extends StreamCompressor {
057        private final OutputStream os;
058
059        public OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
060            super(deflater);
061            this.os = os;
062        }
063
064        @Override
065        protected void writeOut(final byte[] data, final int offset, final int length)
066                throws IOException {
067            os.write(data, offset, length);
068        }
069    }
070
071    private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
072        private final ScatterGatherBackingStore bs;
073
074        public ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
075            super(deflater);
076            this.bs = bs;
077        }
078
079        @Override
080        protected void writeOut(final byte[] data, final int offset, final int length)
081                throws IOException {
082            bs.writeOut(data, offset, length);
083        }
084    }
085
086    private static final class SeekableByteChannelCompressor extends StreamCompressor {
087        private final SeekableByteChannel channel;
088
089        public SeekableByteChannelCompressor(final Deflater deflater,
090                                             final SeekableByteChannel channel) {
091            super(deflater);
092            this.channel = channel;
093        }
094
095        @Override
096        protected void writeOut(final byte[] data, final int offset, final int length)
097                throws IOException {
098            channel.write(ByteBuffer.wrap(data, offset, length));
099        }
100    }
101    /*
102     * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs
103     * when it gets handed a huge buffer.  See
104     * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
105     *
106     * Using a buffer size of 8 kB proved to be a good compromise
107     */
108    private static final int DEFLATER_BLOCK_SIZE = 8192;
109    private static final int BUFFER_SIZE = 4096;
110
111    /**
112     * Create a stream compressor with the given compression level.
113     *
114     * @param os       The DataOutput to receive output
115     * @param deflater The deflater to use for the compressor
116     * @return A stream compressor
117     */
118    static StreamCompressor create(final DataOutput os, final Deflater deflater) {
119        return new DataOutputCompressor(deflater, os);
120    }
121    /**
122     * Create a stream compressor with the given compression level.
123     *
124     * @param compressionLevel The {@link Deflater}  compression level
125     * @param bs               The ScatterGatherBackingStore to receive output
126     * @return A stream compressor
127     */
128    public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
129        final Deflater deflater = new Deflater(compressionLevel, true);
130        return new ScatterGatherBackingStoreCompressor(deflater, bs);
131    }
132    /**
133     * Create a stream compressor with the default compression level.
134     *
135     * @param os The stream to receive output
136     * @return A stream compressor
137     */
138    static StreamCompressor create(final OutputStream os) {
139        return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
140    }
141
142    /**
143     * Create a stream compressor with the given compression level.
144     *
145     * @param os       The stream to receive output
146     * @param deflater The deflater to use
147     * @return A stream compressor
148     */
149    static StreamCompressor create(final OutputStream os, final Deflater deflater) {
150        return new OutputStreamCompressor(deflater, os);
151    }
152
153    /**
154     * Create a stream compressor with the default compression level.
155     *
156     * @param bs The ScatterGatherBackingStore to receive output
157     * @return A stream compressor
158     */
159    public static StreamCompressor create(final ScatterGatherBackingStore bs) {
160        return create(Deflater.DEFAULT_COMPRESSION, bs);
161    }
162
163    /**
164     * Create a stream compressor with the given compression level.
165     *
166     * @param os       The SeekableByteChannel to receive output
167     * @param deflater The deflater to use for the compressor
168     * @return A stream compressor
169     * @since 1.13
170     */
171    static StreamCompressor create(final SeekableByteChannel os, final Deflater deflater) {
172        return new SeekableByteChannelCompressor(deflater, os);
173    }
174
175    private final Deflater def;
176
177    private final CRC32 crc = new CRC32();
178
179    private long writtenToOutputStreamForLastEntry;
180
181    private long sourcePayloadLength;
182
183    private long totalWrittenToOutputStream;
184
185    private final byte[] outputBuffer = new byte[BUFFER_SIZE];
186
187    private final byte[] readerBuf = new byte[BUFFER_SIZE];
188
189    StreamCompressor(final Deflater deflater) {
190        this.def = deflater;
191    }
192
193
194    @Override
195    public void close() throws IOException {
196        def.end();
197    }
198
199    void deflate() throws IOException {
200        final int len = def.deflate(outputBuffer, 0, outputBuffer.length);
201        if (len > 0) {
202            writeCounted(outputBuffer, 0, len);
203        }
204    }
205
206
207    /**
208     * Deflate the given source using the supplied compression method
209     *
210     * @param source The source to compress
211     * @param method The #ZipArchiveEntry compression method
212     * @throws IOException When failures happen
213     */
214
215    public void deflate(final InputStream source, final int method) throws IOException {
216        reset();
217        int length;
218
219        while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
220            write(readerBuf, 0, length, method);
221        }
222        if (method == ZipEntry.DEFLATED) {
223            flushDeflater();
224        }
225    }
226
227    private void deflateUntilInputIsNeeded() throws IOException {
228        while (!def.needsInput()) {
229            deflate();
230        }
231    }
232
233    void flushDeflater() throws IOException {
234        def.finish();
235        while (!def.finished()) {
236            deflate();
237        }
238    }
239
240    /**
241     * Return the number of bytes read from the source stream
242     *
243     * @return The number of bytes read, never negative
244     */
245    public long getBytesRead() {
246        return sourcePayloadLength;
247    }
248
249    /**
250     * The number of bytes written to the output for the last entry
251     *
252     * @return The number of bytes, never negative
253     */
254    public long getBytesWrittenForLastEntry() {
255        return writtenToOutputStreamForLastEntry;
256    }
257
258    /**
259     * The crc32 of the last deflated file
260     *
261     * @return the crc32
262     */
263
264    public long getCrc32() {
265        return crc.getValue();
266    }
267
268    /**
269     * The total number of bytes written to the output for all files
270     *
271     * @return The number of bytes, never negative
272     */
273    public long getTotalBytesWritten() {
274        return totalWrittenToOutputStream;
275    }
276
277    void reset() {
278        crc.reset();
279        def.reset();
280        sourcePayloadLength = 0;
281        writtenToOutputStreamForLastEntry = 0;
282    }
283
284    /**
285     * Writes bytes to ZIP entry.
286     *
287     * @param b      the byte array to write
288     * @param offset the start position to write from
289     * @param length the number of bytes to write
290     * @param method the comrpession method to use
291     * @return the number of bytes written to the stream this time
292     * @throws IOException on error
293     */
294    long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
295        final long current = writtenToOutputStreamForLastEntry;
296        crc.update(b, offset, length);
297        if (method == ZipEntry.DEFLATED) {
298            writeDeflated(b, offset, length);
299        } else {
300            writeCounted(b, offset, length);
301        }
302        sourcePayloadLength += length;
303        return writtenToOutputStreamForLastEntry - current;
304    }
305
306    public void writeCounted(final byte[] data) throws IOException {
307        writeCounted(data, 0, data.length);
308    }
309
310    public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
311        writeOut(data, offset, length);
312        writtenToOutputStreamForLastEntry += length;
313        totalWrittenToOutputStream += length;
314    }
315
316    private void writeDeflated(final byte[] b, final int offset, final int length)
317            throws IOException {
318        if (length > 0 && !def.finished()) {
319            if (length <= DEFLATER_BLOCK_SIZE) {
320                def.setInput(b, offset, length);
321                deflateUntilInputIsNeeded();
322            } else {
323                final int fullblocks = length / DEFLATER_BLOCK_SIZE;
324                for (int i = 0; i < fullblocks; i++) {
325                    def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE,
326                            DEFLATER_BLOCK_SIZE);
327                    deflateUntilInputIsNeeded();
328                }
329                final int done = fullblocks * DEFLATER_BLOCK_SIZE;
330                if (done < length) {
331                    def.setInput(b, offset + done, length - done);
332                    deflateUntilInputIsNeeded();
333                }
334            }
335        }
336    }
337
338    protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
339}