001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.compression; 020 021import java.io.ByteArrayOutputStream; 022import java.io.DataInputStream; 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.io.UnsupportedEncodingException; 026import java.util.HashMap; 027import java.util.Map; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.oozie.util.XLog; 031 032/** 033 * Utility class for maintaining list of codecs and providing facility 034 * for compressing and decompressing. 035 * 036 */ 037public class CodecFactory { 038 private static final Map<String, CompressionCodec> REGISTERED = new HashMap<String, CompressionCodec>(); 039 public static final String COMPRESSION_CODECS = "oozie.compression.codecs"; 040 public static final String COMPRESSION_OUTPUT_CODEC = "oozie.output.compression.codec"; 041 private static CompressionCodec outputCompressionCodec; 042 public static final String COMPRESSION_MAGIC_DATA = "OBJ"; 043 public static final String COMPRESSION_KEY_HEADER = "codec"; 044 public static final String UTF_8_ENCODING = "UTF-8"; 045 private static boolean isEnabled; 046 private static XLog LOG = XLog.getLog(CodecFactory.class);; 047 private static byte[] headerBytes; 048 049 /** 050 * Initialize the codec factory to maintain list of codecs 051 * @param conf 052 * @throws Exception 053 */ 054 public static void initialize(Configuration conf) throws Exception { 055 String outputCompressionStr = conf.get(COMPRESSION_OUTPUT_CODEC); 056 if (outputCompressionStr == null || outputCompressionStr.trim().equalsIgnoreCase("NONE") || 057 outputCompressionStr.trim().equalsIgnoreCase("")) { 058 isEnabled = false; 059 } 060 else { 061 outputCompressionStr = outputCompressionStr.trim(); 062 isEnabled = true; 063 } 064 String[] outputCompressionCodecs = conf.getStrings(COMPRESSION_CODECS); 065 for (String comp : outputCompressionCodecs) { 066 parseCompressionConfig(comp); 067 } 068 if (isEnabled) { 069 if (REGISTERED.get(GzipCompressionCodec.CODEC_NAME) == null) { 070 REGISTERED.put(GzipCompressionCodec.CODEC_NAME, new GzipCompressionCodec()); 071 } 072 outputCompressionCodec = REGISTERED.get(outputCompressionStr); 073 if (outputCompressionCodec == null) { 074 throw new RuntimeException("No codec class found for codec " + outputCompressionStr); 075 } 076 } 077 LOG.info("Using " + outputCompressionStr + " as output compression codec"); 078 079 // Initialize header bytes 080 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 081 DataOutputStream daos = new DataOutputStream(baos); 082 // magic data 083 daos.write(COMPRESSION_MAGIC_DATA.getBytes(UTF_8_ENCODING)); 084 // version 085 daos.writeInt(1); 086 // no of key value pairs 087 daos.writeInt(1); 088 daos.writeUTF(COMPRESSION_KEY_HEADER); 089 daos.writeUTF(outputCompressionStr); 090 daos.close(); 091 headerBytes = baos.toByteArray(); 092 093 } 094 095 private static void parseCompressionConfig(String comp) throws Exception { 096 String[] compression = comp.split("=", 2); 097 if (compression.length == 2) { 098 String key = compression[0]; 099 String value = compression[1]; 100 REGISTERED.put(key, (CompressionCodec) Class.forName(value).newInstance()); 101 LOG.info("Adding [{0}] to list of output compression codecs", key); 102 } 103 else { 104 throw new IllegalArgumentException("Property " + comp + " not in key=value format" 105 + "; output compression cannot be enabled"); 106 } 107 } 108 109 private static CompressionCodec getCodec(String key) { 110 CompressionCodec codec = REGISTERED.get(key); 111 if (codec != null) { 112 return codec; 113 } 114 else { 115 throw new RuntimeException("No compression algo found corresponding to " + key); 116 } 117 } 118 119 /** 120 * Check whether compression is enabled or not 121 * @return true if compression is enabled 122 */ 123 public static boolean isCompressionEnabled() { 124 return isEnabled; 125 } 126 127 /** 128 * Get decompression codec after reading from stream 129 * @param dais the input stream 130 * @return the decompression codec 131 * @throws IOException 132 */ 133 public static CompressionCodec getDeCompressionCodec(DataInputStream dais) throws IOException { 134 byte[] buffer = new byte[COMPRESSION_MAGIC_DATA.length()]; 135 dais.read(buffer, 0, buffer.length); 136 Map<String, String> compressionProps = new HashMap<String, String>(); 137 try { 138 if (new String(buffer, UTF_8_ENCODING).equals(COMPRESSION_MAGIC_DATA)) { 139 // read Version; need to handle if multiple versions are 140 // supported 141 dais.readInt(); 142 // read no of key value pairs; need to handle if more than one 143 dais.readInt(); 144 compressionProps.put(dais.readUTF(), dais.readUTF()); 145 } 146 else { 147 dais.reset(); 148 return null; 149 } 150 } 151 catch (UnsupportedEncodingException ex) { 152 throw new RuntimeException(ex); 153 } 154 return getCodec(compressionProps.get(COMPRESSION_KEY_HEADER)); 155 } 156 157 /** 158 * Get output compression codec 159 * @return the compression codec 160 */ 161 public static CompressionCodec getCompressionCodec() { 162 return outputCompressionCodec; 163 } 164 165 /** 166 * Get header bytes 167 * @return the header bytes 168 */ 169 public static byte[] getHeaderBytes() { 170 return headerBytes; 171 } 172 173}