001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.compression; 019 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.UnsupportedEncodingException; 025import java.util.HashMap; 026import java.util.Map; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.oozie.util.XLog; 030 031/** 032 * Utility class for maintaining list of codecs and providing facility 033 * for compressing and decompressing. 034 * 035 */ 036public class CodecFactory { 037 private static final Map<String, CompressionCodec> REGISTERED = new HashMap<String, CompressionCodec>(); 038 public static final String COMPRESSION_CODECS = "oozie.compression.codecs"; 039 public static final String COMPRESSION_OUTPUT_CODEC = "oozie.output.compression.codec"; 040 private static CompressionCodec outputCompressionCodec; 041 public static final String COMPRESSION_MAGIC_DATA = "OBJ"; 042 public static final String COMPRESSION_KEY_HEADER = "codec"; 043 public static final String UTF_8_ENCODING = "UTF-8"; 044 private static boolean isEnabled; 045 private static XLog LOG = XLog.getLog(CodecFactory.class);; 046 private static byte[] headerBytes; 047 048 /** 049 * Initialize the codec factory to maintain list of codecs 050 * @param conf 051 * @throws Exception 052 */ 053 public static void initialize(Configuration conf) throws Exception { 054 String outputCompressionStr = conf.get(COMPRESSION_OUTPUT_CODEC); 055 if (outputCompressionStr == null || outputCompressionStr.trim().equalsIgnoreCase("NONE") || 056 outputCompressionStr.trim().equalsIgnoreCase("")) { 057 isEnabled = false; 058 } 059 else { 060 outputCompressionStr = outputCompressionStr.trim(); 061 isEnabled = true; 062 } 063 String[] outputCompressionCodecs = conf.getStrings(COMPRESSION_CODECS); 064 for (String comp : outputCompressionCodecs) { 065 parseCompressionConfig(comp); 066 } 067 if (isEnabled) { 068 if (REGISTERED.get(GzipCompressionCodec.CODEC_NAME) == null) { 069 REGISTERED.put(GzipCompressionCodec.CODEC_NAME, new GzipCompressionCodec()); 070 } 071 outputCompressionCodec = REGISTERED.get(outputCompressionStr); 072 if (outputCompressionCodec == null) { 073 throw new RuntimeException("No codec class found for codec " + outputCompressionStr); 074 } 075 } 076 LOG.info("Using " + outputCompressionStr + " as output compression codec"); 077 078 // Initialize header bytes 079 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 080 DataOutputStream daos = new DataOutputStream(baos); 081 // magic data 082 daos.write(COMPRESSION_MAGIC_DATA.getBytes(UTF_8_ENCODING)); 083 // version 084 daos.writeInt(1); 085 // no of key value pairs 086 daos.writeInt(1); 087 daos.writeUTF(COMPRESSION_KEY_HEADER); 088 daos.writeUTF(outputCompressionStr); 089 daos.close(); 090 headerBytes = baos.toByteArray(); 091 092 } 093 094 private static void parseCompressionConfig(String comp) throws Exception { 095 String[] compression = comp.split("=", 2); 096 if (compression.length == 2) { 097 String key = compression[0]; 098 String value = compression[1]; 099 REGISTERED.put(key, (CompressionCodec) Class.forName(value).newInstance()); 100 LOG.info("Adding [{0}] to list of output compression codecs", key); 101 } 102 else { 103 throw new IllegalArgumentException("Property " + comp + " not in key=value format" 104 + "; output compression cannot be enabled"); 105 } 106 } 107 108 private static CompressionCodec getCodec(String key) { 109 CompressionCodec codec = REGISTERED.get(key); 110 if (codec != null) { 111 return codec; 112 } 113 else { 114 throw new RuntimeException("No compression algo found corresponding to " + key); 115 } 116 } 117 118 /** 119 * Check whether compression is enabled or not 120 * @return true if compression is enabled 121 */ 122 public static boolean isCompressionEnabled() { 123 return isEnabled; 124 } 125 126 /** 127 * Get decompression codec after reading from stream 128 * @param dais the input stream 129 * @return the decompression codec 130 * @throws IOException 131 */ 132 public static CompressionCodec getDeCompressionCodec(DataInputStream dais) throws IOException { 133 byte[] buffer = new byte[COMPRESSION_MAGIC_DATA.length()]; 134 dais.read(buffer, 0, buffer.length); 135 Map<String, String> compressionProps = new HashMap<String, String>(); 136 try { 137 if (new String(buffer, UTF_8_ENCODING).equals(COMPRESSION_MAGIC_DATA)) { 138 // read Version; need to handle if multiple versions are 139 // supported 140 dais.readInt(); 141 // read no of key value pairs; need to handle if more than one 142 dais.readInt(); 143 compressionProps.put(dais.readUTF(), dais.readUTF()); 144 } 145 else { 146 dais.reset(); 147 return null; 148 } 149 } 150 catch (UnsupportedEncodingException ex) { 151 throw new RuntimeException(ex); 152 } 153 return getCodec(compressionProps.get(COMPRESSION_KEY_HEADER)); 154 } 155 156 /** 157 * Get output compression codec 158 * @return the compression codec 159 */ 160 public static CompressionCodec getCompressionCodec() { 161 return outputCompressionCodec; 162 } 163 164 /** 165 * Get header bytes 166 * @return the header bytes 167 */ 168 public static byte[] getHeaderBytes() { 169 return headerBytes; 170 } 171 172}