001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import org.apache.hadoop.io.Writable; 022import org.apache.hadoop.util.ReflectionUtils; 023import org.apache.oozie.compression.CodecFactory; 024 025import java.io.ByteArrayInputStream; 026import java.io.ByteArrayOutputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.io.DataOutput; 031import java.io.DataInput; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037 038/** 039 * Utility class to write/read Hadoop writables to/from a byte array. 040 */ 041public class WritableUtils { 042 043 public static XLog LOG = XLog.getLog(WritableUtils.class); 044 045 /** 046 * Write a writable to a byte array. 047 * 048 * @param writable writable to write. 049 * @return array containing the serialized writable. 050 */ 051 public static byte[] toByteArray(Writable writable) { 052 try { 053 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 054 DataOutputStream daos = new DataOutputStream(baos); 055 writable.write(daos); 056 daos.close(); 057 return baos.toByteArray(); 058 } 059 catch (IOException ex) { 060 throw new RuntimeException(ex); 061 } 062 } 063 064 /** 065 * Read a writable from a byte array. 066 * 067 * @param array byte array with the serialized writable. 068 * @param clazz writable class. 069 * @return writable deserialized from the byte array. 070 */ 071 public static <T extends Writable> T fromByteArray(byte[] array, Class<T> clazz) { 072 try { 073 T o = (T) ReflectionUtils.newInstance(clazz, null); 074 o.readFields(new DataInputStream(new ByteArrayInputStream(array))); 075 return o; 076 } 077 catch (IOException ex) { 078 throw new RuntimeException(ex); 079 } 080 } 081 082 private static final String NULL = "||"; 083 084 /** 085 * Write a string to a data output supporting <code>null</code> values. <p> It uses the '||' token to represent 086 * <code>null</code>. 087 * 088 * @param dataOutput data output. 089 * @param str string to write. 090 * @throws IOException thrown if the string could not be written. 091 */ 092 public static void writeStr(DataOutput dataOutput, String str) throws IOException { 093 str = (str != null) ? str : NULL; 094 dataOutput.writeUTF(str); 095 } 096 097 /** 098 * Read a string from a data input supporting <code>null</code> values. <p> It uses the '||' token to represent 099 * <code>null</code>. 100 * 101 * @param dataInput data input. 102 * @return read string, <code>null</code> if the '||' token was read. 103 * @throws IOException thrown if the string could not be read. 104 */ 105 public static String readStr(DataInput dataInput) throws IOException { 106 String str = dataInput.readUTF(); 107 return (str.equals(NULL)) ? null : str; 108 } 109 110 /** 111 * Read list. 112 * 113 * @param <T> the generic type 114 * @param dataInput the data input 115 * @param clazz the clazz 116 * @return the list 117 * @throws IOException Signals that an I/O exception has occurred. 118 */ 119 public static <T extends Writable> List<T> readList(DataInput dataInput, Class<T> clazz) throws IOException { 120 List<T> a = new ArrayList<T>(); 121 int count = dataInput.readInt(); 122 for (int i = 0; i < count; i++) { 123 T o = (T) ReflectionUtils.newInstance(clazz, null); 124 o.readFields(dataInput); 125 a.add(o); 126 } 127 return a; 128 } 129 130 public static List<String> readStringList(DataInput dataInput) throws IOException { 131 List<String> a = new ArrayList<String>(); 132 int count = dataInput.readInt(); 133 for (int i = 0; i < count; i++) { 134 a.add(readBytesAsString(dataInput)); 135 } 136 return a; 137 } 138 139 /** 140 * Write list. 141 * 142 * @param <T> the generic type 143 * @param dataOutput the data output 144 * @param list the list 145 * @throws IOException Signals that an I/O exception has occurred. 146 */ 147 public static <T extends Writable> void writeList(DataOutput dataOutput, List<T> list) throws IOException { 148 dataOutput.writeInt(list.size()); 149 for (T t : list) { 150 t.write(dataOutput); 151 } 152 } 153 154 public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException { 155 dataOutput.writeInt(list.size()); 156 for (String str : list) { 157 writeStringAsBytes(dataOutput, str); 158 } 159 } 160 161 /** 162 * Write map. 163 * 164 * @param <T> the generic type 165 * @param dataOutput the data output 166 * @param map the map 167 * @throws IOException Signals that an I/O exception has occurred. 168 */ 169 public static <T extends Writable> void writeMap(DataOutput dataOutput, Map<String, T> map) throws IOException { 170 dataOutput.writeInt(map.size()); 171 for (Entry<String, T> t : map.entrySet()) { 172 writeStringAsBytes(dataOutput, t.getKey()); 173 t.getValue().write(dataOutput); 174 } 175 } 176 177 /** 178 * Write map with list. 179 * 180 * @param <T> the generic type 181 * @param dataOutput the data output 182 * @param map the map 183 * @throws IOException Signals that an I/O exception has occurred. 184 */ 185 public static <T extends Writable> void writeMapWithList(DataOutput dataOutput, Map<String, List<T>> map) 186 throws IOException { 187 dataOutput.writeInt(map.size()); 188 for (Entry<String, List<T>> t : map.entrySet()) { 189 writeStringAsBytes(dataOutput, t.getKey()); 190 writeList(dataOutput, t.getValue()); 191 } 192 } 193 194 /** 195 * Read map. 196 * 197 * @param <T> the generic type 198 * @param dataInput the data input 199 * @param clazz the clazz 200 * @return the map 201 * @throws IOException Signals that an I/O exception has occurred. 202 */ 203 public static <T extends Writable> Map<String, T> readMap(DataInput dataInput, Class<T> clazz) throws IOException { 204 Map<String, T> map = new HashMap<String, T>(); 205 int count = dataInput.readInt(); 206 for (int i = 0; i < count; i++) { 207 String key = readBytesAsString(dataInput); 208 T value = (T) ReflectionUtils.newInstance(clazz, null); 209 value.readFields(dataInput); 210 map.put(key, value); 211 } 212 return map; 213 } 214 215 /** 216 * Read map with list. 217 * 218 * @param <T> the generic type 219 * @param dataInput the data input 220 * @param clazz the clazz 221 * @return the map 222 * @throws IOException Signals that an I/O exception has occurred. 223 */ 224 public static <T extends Writable> Map<String, List<T>> readMapWithList(DataInput dataInput, Class<T> clazz) 225 throws IOException { 226 Map<String, List<T>> map = new HashMap<String, List<T>>(); 227 int count = dataInput.readInt(); 228 for (int i = 0; i < count; i++) { 229 String key = readBytesAsString(dataInput); 230 map.put(key, readList(dataInput, clazz)); 231 } 232 return map; 233 } 234 235 public static void writeStringAsBytes(DataOutput dOut, String value) throws IOException { 236 byte[] data = value.getBytes(CodecFactory.UTF_8_ENCODING); 237 dOut.writeInt(data.length); 238 dOut.write(data); 239 } 240 241 public static String readBytesAsString(DataInput dIn) throws IOException { 242 int length = dIn.readInt(); 243 byte[] data = new byte[length]; 244 dIn.readFully(data); 245 return new String(data, CodecFactory.UTF_8_ENCODING); 246 } 247 248}