001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.compression;
020
021import java.io.ByteArrayOutputStream;
022import java.io.DataInputStream;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.io.UnsupportedEncodingException;
026import java.util.HashMap;
027import java.util.Map;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.oozie.util.XLog;
031
032/**
033 *  Utility class for maintaining list of codecs and providing facility
034 *  for compressing and decompressing.
035 *
036 */
037public class CodecFactory {
038    private static final Map<String, CompressionCodec> REGISTERED = new HashMap<String, CompressionCodec>();
039    public static final String COMPRESSION_CODECS = "oozie.compression.codecs";
040    public static final String COMPRESSION_OUTPUT_CODEC = "oozie.output.compression.codec";
041    private static CompressionCodec outputCompressionCodec;
042    public static final String COMPRESSION_MAGIC_DATA = "OBJ";
043    public static final String COMPRESSION_KEY_HEADER = "codec";
044    public static final String UTF_8_ENCODING = "UTF-8";
045    private static boolean isEnabled;
046    private static XLog LOG = XLog.getLog(CodecFactory.class);;
047    private static byte[] headerBytes;
048
049    /**
050     * Initialize the codec factory to maintain list of codecs
051     * @param conf
052     * @throws Exception
053     */
054    public static void initialize(Configuration conf) throws Exception {
055        String outputCompressionStr = conf.get(COMPRESSION_OUTPUT_CODEC);
056        if (outputCompressionStr == null || outputCompressionStr.trim().equalsIgnoreCase("NONE") ||
057                outputCompressionStr.trim().equalsIgnoreCase("")) {
058            isEnabled = false;
059        }
060        else {
061            outputCompressionStr = outputCompressionStr.trim();
062            isEnabled = true;
063        }
064        String[] outputCompressionCodecs = conf.getStrings(COMPRESSION_CODECS);
065        for (String comp : outputCompressionCodecs) {
066            parseCompressionConfig(comp);
067        }
068        if (isEnabled) {
069            if (REGISTERED.get(GzipCompressionCodec.CODEC_NAME) == null) {
070                REGISTERED.put(GzipCompressionCodec.CODEC_NAME, new GzipCompressionCodec());
071            }
072            outputCompressionCodec = REGISTERED.get(outputCompressionStr);
073            if (outputCompressionCodec == null) {
074                throw new RuntimeException("No codec class found for codec " + outputCompressionStr);
075            }
076        }
077        LOG.info("Using " + outputCompressionStr + " as output compression codec");
078
079        // Initialize header bytes
080        ByteArrayOutputStream baos = new ByteArrayOutputStream();
081        DataOutputStream daos = new DataOutputStream(baos);
082        // magic data
083        daos.write(COMPRESSION_MAGIC_DATA.getBytes(UTF_8_ENCODING));
084        // version
085        daos.writeInt(1);
086        // no of key value pairs
087        daos.writeInt(1);
088        daos.writeUTF(COMPRESSION_KEY_HEADER);
089        daos.writeUTF(outputCompressionStr);
090        daos.close();
091        headerBytes = baos.toByteArray();
092
093    }
094
095    private static void parseCompressionConfig(String comp) throws Exception {
096        String[] compression = comp.split("=", 2);
097        if (compression.length == 2) {
098            String key = compression[0];
099            String value = compression[1];
100            REGISTERED.put(key, (CompressionCodec) Class.forName(value).newInstance());
101            LOG.info("Adding [{0}] to list of output compression codecs", key);
102        }
103        else {
104            throw new IllegalArgumentException("Property " + comp + " not in key=value format"
105                    + "; output compression cannot be enabled");
106        }
107    }
108
109    private static CompressionCodec getCodec(String key) {
110        CompressionCodec codec = REGISTERED.get(key);
111        if (codec != null) {
112            return codec;
113        }
114        else {
115            throw new RuntimeException("No compression algo found corresponding to " + key);
116        }
117    }
118
119    /**
120     * Check whether compression is enabled or not
121     * @return true if compression is enabled
122     */
123    public static boolean isCompressionEnabled() {
124        return isEnabled;
125    }
126
127    /**
128     * Get decompression codec after reading from stream
129     * @param dais the input stream
130     * @return the decompression codec
131     * @throws IOException
132     */
133    public static CompressionCodec getDeCompressionCodec(DataInputStream dais) throws IOException {
134        byte[] buffer = new byte[COMPRESSION_MAGIC_DATA.length()];
135        dais.read(buffer, 0, buffer.length);
136        Map<String, String> compressionProps = new HashMap<String, String>();
137        try {
138            if (new String(buffer, UTF_8_ENCODING).equals(COMPRESSION_MAGIC_DATA)) {
139                // read Version; need to handle if multiple versions are
140                // supported
141                dais.readInt();
142                // read no of key value pairs; need to handle if more than one
143                dais.readInt();
144                compressionProps.put(dais.readUTF(), dais.readUTF());
145            }
146            else {
147                dais.reset();
148                return null;
149            }
150        }
151        catch (UnsupportedEncodingException ex) {
152            throw new RuntimeException(ex);
153        }
154        return getCodec(compressionProps.get(COMPRESSION_KEY_HEADER));
155    }
156
157    /**
158     * Get output compression codec
159     * @return the compression codec
160     */
161    public static CompressionCodec getCompressionCodec() {
162        return outputCompressionCodec;
163    }
164
165    /**
166     * Get header bytes
167     * @return the header bytes
168     */
169    public static byte[] getHeaderBytes() {
170        return headerBytes;
171    }
172
173}