001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.compression;
019
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.UnsupportedEncodingException;
025import java.util.HashMap;
026import java.util.Map;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.oozie.util.XLog;
030
031/**
032 *  Utility class for maintaining list of codecs and providing facility
033 *  for compressing and decompressing.
034 *
035 */
036public class CodecFactory {
037    private static final Map<String, CompressionCodec> REGISTERED = new HashMap<String, CompressionCodec>();
038    public static final String COMPRESSION_CODECS = "oozie.compression.codecs";
039    public static final String COMPRESSION_OUTPUT_CODEC = "oozie.output.compression.codec";
040    private static CompressionCodec outputCompressionCodec;
041    public static final String COMPRESSION_MAGIC_DATA = "OBJ";
042    public static final String COMPRESSION_KEY_HEADER = "codec";
043    public static final String UTF_8_ENCODING = "UTF-8";
044    private static boolean isEnabled;
045    private static XLog LOG = XLog.getLog(CodecFactory.class);;
046    private static byte[] headerBytes;
047
048    /**
049     * Initialize the codec factory to maintain list of codecs
050     * @param conf
051     * @throws Exception
052     */
053    public static void initialize(Configuration conf) throws Exception {
054        String outputCompressionStr = conf.get(COMPRESSION_OUTPUT_CODEC);
055        if (outputCompressionStr == null || outputCompressionStr.trim().equalsIgnoreCase("NONE") ||
056                outputCompressionStr.trim().equalsIgnoreCase("")) {
057            isEnabled = false;
058        }
059        else {
060            outputCompressionStr = outputCompressionStr.trim();
061            isEnabled = true;
062        }
063        String[] outputCompressionCodecs = conf.getStrings(COMPRESSION_CODECS);
064        for (String comp : outputCompressionCodecs) {
065            parseCompressionConfig(comp);
066        }
067        if (isEnabled) {
068            if (REGISTERED.get(GzipCompressionCodec.CODEC_NAME) == null) {
069                REGISTERED.put(GzipCompressionCodec.CODEC_NAME, new GzipCompressionCodec());
070            }
071            outputCompressionCodec = REGISTERED.get(outputCompressionStr);
072            if (outputCompressionCodec == null) {
073                throw new RuntimeException("No codec class found for codec " + outputCompressionStr);
074            }
075        }
076        LOG.info("Using " + outputCompressionStr + " as output compression codec");
077
078        // Initialize header bytes
079        ByteArrayOutputStream baos = new ByteArrayOutputStream();
080        DataOutputStream daos = new DataOutputStream(baos);
081        // magic data
082        daos.write(COMPRESSION_MAGIC_DATA.getBytes(UTF_8_ENCODING));
083        // version
084        daos.writeInt(1);
085        // no of key value pairs
086        daos.writeInt(1);
087        daos.writeUTF(COMPRESSION_KEY_HEADER);
088        daos.writeUTF(outputCompressionStr);
089        daos.close();
090        headerBytes = baos.toByteArray();
091
092    }
093
094    private static void parseCompressionConfig(String comp) throws Exception {
095        String[] compression = comp.split("=", 2);
096        if (compression.length == 2) {
097            String key = compression[0];
098            String value = compression[1];
099            REGISTERED.put(key, (CompressionCodec) Class.forName(value).newInstance());
100            LOG.info("Adding [{0}] to list of output compression codecs", key);
101        }
102        else {
103            throw new IllegalArgumentException("Property " + comp + " not in key=value format"
104                    + "; output compression cannot be enabled");
105        }
106    }
107
108    private static CompressionCodec getCodec(String key) {
109        CompressionCodec codec = REGISTERED.get(key);
110        if (codec != null) {
111            return codec;
112        }
113        else {
114            throw new RuntimeException("No compression algo found corresponding to " + key);
115        }
116    }
117
118    /**
119     * Check whether compression is enabled or not
120     * @return true if compression is enabled
121     */
122    public static boolean isCompressionEnabled() {
123        return isEnabled;
124    }
125
126    /**
127     * Get decompression codec after reading from stream
128     * @param dais the input stream
129     * @return the decompression codec
130     * @throws IOException
131     */
132    public static CompressionCodec getDeCompressionCodec(DataInputStream dais) throws IOException {
133        byte[] buffer = new byte[COMPRESSION_MAGIC_DATA.length()];
134        dais.read(buffer, 0, buffer.length);
135        Map<String, String> compressionProps = new HashMap<String, String>();
136        try {
137            if (new String(buffer, UTF_8_ENCODING).equals(COMPRESSION_MAGIC_DATA)) {
138                // read Version; need to handle if multiple versions are
139                // supported
140                dais.readInt();
141                // read no of key value pairs; need to handle if more than one
142                dais.readInt();
143                compressionProps.put(dais.readUTF(), dais.readUTF());
144            }
145            else {
146                dais.reset();
147                return null;
148            }
149        }
150        catch (UnsupportedEncodingException ex) {
151            throw new RuntimeException(ex);
152        }
153        return getCodec(compressionProps.get(COMPRESSION_KEY_HEADER));
154    }
155
156    /**
157     * Get output compression codec
158     * @return the compression codec
159     */
160    public static CompressionCodec getCompressionCodec() {
161        return outputCompressionCodec;
162    }
163
164    /**
165     * Get header bytes
166     * @return the header bytes
167     */
168    public static byte[] getHeaderBytes() {
169        return headerBytes;
170    }
171
172}