001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024import java.util.Arrays;
025
026import org.apache.commons.compress.compressors.CompressorInputStream;
027import org.apache.commons.compress.utils.BoundedInputStream;
028import org.apache.commons.compress.utils.ByteUtils;
029import org.apache.commons.compress.utils.CountingInputStream;
030import org.apache.commons.compress.utils.IOUtils;
031import org.apache.commons.compress.utils.InputStreamStatistics;
032
033/**
034 * CompressorInputStream for the framing Snappy format.
035 *
036 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p>
037 *
038 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
039 * @since 1.7
040 */
041public class FramedSnappyCompressorInputStream extends CompressorInputStream
042    implements InputStreamStatistics {
043
044    /**
045     * package private for tests only.
046     */
047    static final long MASK_OFFSET = 0xa282ead8L;
048
049    private static final int STREAM_IDENTIFIER_TYPE = 0xff;
050    static final int COMPRESSED_CHUNK_TYPE = 0;
051    private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
052    private static final int PADDING_CHUNK_TYPE = 0xfe;
053    private static final int MIN_UNSKIPPABLE_TYPE = 2;
054    private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
055    private static final int MAX_SKIPPABLE_TYPE = 0xfd;
056
057    // used by FramedSnappyCompressorOutputStream as well
058    static final byte[] SZ_SIGNATURE = { //NOSONAR
059        (byte) STREAM_IDENTIFIER_TYPE, // tag
060        6, 0, 0, // length
061        's', 'N', 'a', 'P', 'p', 'Y'
062    };
063
064    /**
065     * Checks if the signature matches what is expected for a .sz file.
066     *
067     * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p>
068     *
069     * @param signature the bytes to check
070     * @param length    the number of bytes to check
071     * @return          true if this is a .sz stream, false otherwise
072     */
073    public static boolean matches(final byte[] signature, final int length) {
074
075        if (length < SZ_SIGNATURE.length) {
076            return false;
077        }
078
079        byte[] shortenedSig = signature;
080        if (signature.length > SZ_SIGNATURE.length) {
081            shortenedSig = Arrays.copyOf(signature, SZ_SIGNATURE.length);
082        }
083
084        return Arrays.equals(shortenedSig, SZ_SIGNATURE);
085    }
086    static long unmask(long x) {
087        // ugly, maybe we should just have used ints and deal with the
088        // overflow
089        x -= MASK_OFFSET;
090        x &= 0xffffFFFFL;
091        return (x >> 17 | x << 15) & 0xffffFFFFL;
092    }
093
094    private long unreadBytes;
095
096    private final CountingInputStream countingStream;
097
098    /** The underlying stream to read compressed data from */
099    private final PushbackInputStream inputStream;
100
101    /** The dialect to expect */
102    private final FramedSnappyDialect dialect;
103
104    private SnappyCompressorInputStream currentCompressedChunk;
105
106    // used in no-arg read method
107    private final byte[] oneByte = new byte[1];
108    private boolean endReached, inUncompressedChunk;
109    private int uncompressedBytesRemaining;
110    private long expectedChecksum = -1;
111
112    private final int blockSize;
113
114    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
115
116    private final ByteUtils.ByteSupplier supplier = this::readOneByte;
117
118    /**
119     * Constructs a new input stream that decompresses
120     * snappy-framed-compressed data from the specified input stream
121     * using the {@link FramedSnappyDialect#STANDARD} dialect.
122     * @param in  the InputStream from which to read the compressed data
123     * @throws IOException if reading fails
124     */
125    public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
126        this(in, FramedSnappyDialect.STANDARD);
127    }
128
129    /**
130     * Constructs a new input stream that decompresses snappy-framed-compressed data
131     * from the specified input stream.
132     * @param in  the InputStream from which to read the compressed data
133     * @param dialect the dialect used by the compressed stream
134     * @throws IOException if reading fails
135     */
136    public FramedSnappyCompressorInputStream(final InputStream in,
137                                             final FramedSnappyDialect dialect)
138        throws IOException {
139        this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
140    }
141
142    /**
143     * Constructs a new input stream that decompresses snappy-framed-compressed data
144     * from the specified input stream.
145     * @param in  the InputStream from which to read the compressed data
146     * @param blockSize the block size to use for the compressed stream
147     * @param dialect the dialect used by the compressed stream
148     * @throws IOException if reading fails
149     * @throws IllegalArgumentException if blockSize is not bigger than 0
150     * @since 1.14
151     */
152    public FramedSnappyCompressorInputStream(final InputStream in,
153                                             final int blockSize,
154                                             final FramedSnappyDialect dialect)
155        throws IOException {
156        if (blockSize <= 0) {
157            throw new IllegalArgumentException("blockSize must be bigger than 0");
158        }
159        countingStream = new CountingInputStream(in);
160        this.inputStream = new PushbackInputStream(countingStream, 1);
161        this.blockSize = blockSize;
162        this.dialect = dialect;
163        if (dialect.hasStreamIdentifier()) {
164            readStreamIdentifier();
165        }
166    }
167
168    /** {@inheritDoc} */
169    @Override
170    public int available() throws IOException {
171        if (inUncompressedChunk) {
172            return Math.min(uncompressedBytesRemaining,
173                            inputStream.available());
174        }
175        if (currentCompressedChunk != null) {
176            return currentCompressedChunk.available();
177        }
178        return 0;
179    }
180
181    /** {@inheritDoc} */
182    @Override
183    public void close() throws IOException {
184        try {
185            if (currentCompressedChunk != null) {
186                currentCompressedChunk.close();
187                currentCompressedChunk = null;
188            }
189        } finally {
190            inputStream.close();
191        }
192    }
193
194    /**
195     * @since 1.17
196     */
197    @Override
198    public long getCompressedCount() {
199        return countingStream.getBytesRead() - unreadBytes;
200    }
201
202    /** {@inheritDoc} */
203    @Override
204    public int read() throws IOException {
205        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
206    }
207
208    /** {@inheritDoc} */
209    @Override
210    public int read(final byte[] b, final int off, final int len) throws IOException {
211        if (len == 0) {
212            return 0;
213        }
214        int read = readOnce(b, off, len);
215        if (read == -1) {
216            readNextBlock();
217            if (endReached) {
218                return -1;
219            }
220            read = readOnce(b, off, len);
221        }
222        return read;
223    }
224
225    private long readCrc() throws IOException {
226        final byte[] b = new byte[4];
227        final int read = IOUtils.readFully(inputStream, b);
228        count(read);
229        if (read != 4) {
230            throw new IOException("Premature end of stream");
231        }
232        return ByteUtils.fromLittleEndian(b);
233    }
234
235    private void readNextBlock() throws IOException {
236        verifyLastChecksumAndReset();
237        inUncompressedChunk = false;
238        final int type = readOneByte();
239        if (type == -1) {
240            endReached = true;
241        } else if (type == STREAM_IDENTIFIER_TYPE) {
242            inputStream.unread(type);
243            unreadBytes++;
244            pushedBackBytes(1);
245            readStreamIdentifier();
246            readNextBlock();
247        } else if (type == PADDING_CHUNK_TYPE
248                   || type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE) {
249            skipBlock();
250            readNextBlock();
251        } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
252            throw new IOException("Unskippable chunk with type " + type
253                                  + " (hex " + Integer.toHexString(type) + ")"
254                                  + " detected.");
255        } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
256            inUncompressedChunk = true;
257            uncompressedBytesRemaining = readSize() - 4 /* CRC */;
258            if (uncompressedBytesRemaining < 0) {
259                throw new IOException("Found illegal chunk with negative size");
260            }
261            expectedChecksum = unmask(readCrc());
262        } else if (type == COMPRESSED_CHUNK_TYPE) {
263            final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
264            final long size = readSize() - (expectChecksum ? 4L : 0L);
265            if (size < 0) {
266                throw new IOException("Found illegal chunk with negative size");
267            }
268            if (expectChecksum) {
269                expectedChecksum = unmask(readCrc());
270            } else {
271                expectedChecksum = -1;
272            }
273            currentCompressedChunk =
274                new SnappyCompressorInputStream(new BoundedInputStream(inputStream, size), blockSize);
275            // constructor reads uncompressed size
276            count(currentCompressedChunk.getBytesRead());
277        } else {
278            // impossible as all potential byte values have been covered
279            throw new IOException("Unknown chunk type " + type
280                                  + " detected.");
281        }
282    }
283
284    /**
285     * Read from the current chunk into the given array.
286     *
287     * @return -1 if there is no current chunk or the number of bytes
288     * read from the current chunk (which may be -1 if the end of the
289     * chunk is reached).
290     */
291    private int readOnce(final byte[] b, final int off, final int len) throws IOException {
292        int read = -1;
293        if (inUncompressedChunk) {
294            final int amount = Math.min(uncompressedBytesRemaining, len);
295            if (amount == 0) {
296                return -1;
297            }
298            read = inputStream.read(b, off, amount);
299            if (read != -1) {
300                uncompressedBytesRemaining -= read;
301                count(read);
302            }
303        } else if (currentCompressedChunk != null) {
304            final long before = currentCompressedChunk.getBytesRead();
305            read = currentCompressedChunk.read(b, off, len);
306            if (read == -1) {
307                currentCompressedChunk.close();
308                currentCompressedChunk = null;
309            } else {
310                count(currentCompressedChunk.getBytesRead() - before);
311            }
312        }
313        if (read > 0) {
314            checksum.update(b, off, read);
315        }
316        return read;
317    }
318
319    private int readOneByte() throws IOException {
320        final int b = inputStream.read();
321        if (b != -1) {
322            count(1);
323            return b & 0xFF;
324        }
325        return -1;
326    }
327
328    private int readSize() throws IOException {
329        return (int) ByteUtils.fromLittleEndian(supplier, 3);
330    }
331
332    private void readStreamIdentifier() throws IOException {
333        final byte[] b = new byte[10];
334        final int read = IOUtils.readFully(inputStream, b);
335        count(read);
336        if (10 != read || !matches(b, 10)) {
337            throw new IOException("Not a framed Snappy stream");
338        }
339    }
340
341    private void skipBlock() throws IOException {
342        final int size = readSize();
343        if (size < 0) {
344            throw new IOException("Found illegal chunk with negative size");
345        }
346        final long read = IOUtils.skip(inputStream, size);
347        count(read);
348        if (read != size) {
349            throw new IOException("Premature end of stream");
350        }
351    }
352
353    private void verifyLastChecksumAndReset() throws IOException {
354        if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
355            throw new IOException("Checksum verification failed");
356        }
357        expectedChecksum = -1;
358        checksum.reset();
359    }
360
361}