001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024 025import org.apache.commons.compress.compressors.CompressorOutputStream; 026import org.apache.commons.compress.compressors.lz77support.Parameters; 027import org.apache.commons.compress.utils.ByteUtils; 028 029/** 030 * CompressorOutputStream for the framing Snappy format. 031 * 032 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p> 033 * 034 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 035 * @since 1.14 036 * @NotThreadSafe 037 */ 038public class FramedSnappyCompressorOutputStream extends CompressorOutputStream { 039 // see spec: 040 // > However, we place an additional restriction that the uncompressed data 041 // > in a chunk must be no longer than 65536 bytes. This allows consumers to 042 // > easily use small fixed-size buffers. 043 private static final int MAX_COMPRESSED_BUFFER_SIZE = 1 << 16; 044 045 static long mask(long x) { 046 // ugly, maybe we should just have used ints and deal with the 047 // overflow 048 x = x >> 15 | x << 17; 049 x += FramedSnappyCompressorInputStream.MASK_OFFSET; 050 x &= 0xffffFFFFL; 051 return x; 052 } 053 private final OutputStream out; 054 private final Parameters params; 055 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 056 // used in one-arg write method 057 private final byte[] oneByte = new byte[1]; 058 private final byte[] buffer = new byte[MAX_COMPRESSED_BUFFER_SIZE]; 059 060 private int currentIndex; 061 062 private final ByteUtils.ByteConsumer consumer; 063 064 /** 065 * Constructs a new output stream that compresses 066 * snappy-framed-compressed data to the specified output stream. 067 * @param out the OutputStream to which to write the compressed data 068 * @throws IOException if writing the signature fails 069 */ 070 public FramedSnappyCompressorOutputStream(final OutputStream out) throws IOException { 071 this(out, SnappyCompressorOutputStream.createParameterBuilder(SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE) 072 .build()); 073 } 074 075 /** 076 * Constructs a new output stream that compresses 077 * snappy-framed-compressed data to the specified output stream. 078 * @param out the OutputStream to which to write the compressed data 079 * @param params parameters used to fine-tune compression, in 080 * particular to balance compression ratio vs compression speed. 081 * @throws IOException if writing the signature fails 082 */ 083 public FramedSnappyCompressorOutputStream(final OutputStream out, final Parameters params) throws IOException { 084 this.out = out; 085 this.params = params; 086 consumer = new ByteUtils.OutputStreamByteConsumer(out); 087 out.write(FramedSnappyCompressorInputStream.SZ_SIGNATURE); 088 } 089 090 @Override 091 public void close() throws IOException { 092 try { 093 finish(); 094 } finally { 095 out.close(); 096 } 097 } 098 099 /** 100 * Compresses all remaining data and writes it to the stream, 101 * doesn't close the underlying stream. 102 * @throws IOException if an error occurs 103 */ 104 public void finish() throws IOException { 105 if (currentIndex > 0) { 106 flushBuffer(); 107 } 108 } 109 110 private void flushBuffer() throws IOException { 111 out.write(FramedSnappyCompressorInputStream.COMPRESSED_CHUNK_TYPE); 112 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 113 try (OutputStream o = new SnappyCompressorOutputStream(baos, currentIndex, params)) { 114 o.write(buffer, 0, currentIndex); 115 } 116 final byte[] b = baos.toByteArray(); 117 writeLittleEndian(3, b.length + 4L /* CRC */); 118 writeCrc(); 119 out.write(b); 120 currentIndex = 0; 121 } 122 123 @Override 124 public void write(final byte[] data, int off, int len) throws IOException { 125 if (currentIndex + len > MAX_COMPRESSED_BUFFER_SIZE) { 126 flushBuffer(); 127 while (len > MAX_COMPRESSED_BUFFER_SIZE) { 128 System.arraycopy(data, off, buffer, 0, MAX_COMPRESSED_BUFFER_SIZE); 129 off += MAX_COMPRESSED_BUFFER_SIZE; 130 len -= MAX_COMPRESSED_BUFFER_SIZE; 131 currentIndex = MAX_COMPRESSED_BUFFER_SIZE; 132 flushBuffer(); 133 } 134 } 135 System.arraycopy(data, off, buffer, currentIndex, len); 136 currentIndex += len; 137 } 138 139 @Override 140 public void write(final int b) throws IOException { 141 oneByte[0] = (byte) (b & 0xff); 142 write(oneByte); 143 } 144 145 private void writeCrc() throws IOException { 146 checksum.update(buffer, 0, currentIndex); 147 writeLittleEndian(4, mask(checksum.getValue())); 148 checksum.reset(); 149 } 150 151 private void writeLittleEndian(final int numBytes, final long num) throws IOException { 152 ByteUtils.toLittleEndian(consumer, num, numBytes); 153 } 154}