001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import org.apache.hadoop.io.Writable; 022import org.apache.hadoop.util.ReflectionUtils; 023import org.apache.oozie.compression.CodecFactory; 024 025import java.io.ByteArrayInputStream; 026import java.io.ByteArrayOutputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.io.DataOutput; 031import java.io.DataInput; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037 038/** 039 * Utility class to write/read Hadoop writables to/from a byte array. 040 */ 041public class WritableUtils { 042 043 public static XLog LOG = XLog.getLog(WritableUtils.class); 044 045 /** 046 * Write a writable to a byte array. 047 * 048 * @param writable writable to write. 049 * @return array containing the serialized writable. 050 */ 051 public static byte[] toByteArray(Writable writable) { 052 try { 053 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 054 DataOutputStream daos = new DataOutputStream(baos); 055 writable.write(daos); 056 daos.close(); 057 return baos.toByteArray(); 058 } 059 catch (IOException ex) { 060 throw new RuntimeException(ex); 061 } 062 } 063 064 /** 065 * Read a writable from a byte array. 066 * 067 * @param <T> the object type 068 * @param array byte array with the serialized writable. 069 * @param clazz writable class. 070 * @return writable deserialized from the byte array. 071 */ 072 public static <T extends Writable> T fromByteArray(byte[] array, Class<T> clazz) { 073 try { 074 T o = (T) ReflectionUtils.newInstance(clazz, null); 075 o.readFields(new DataInputStream(new ByteArrayInputStream(array))); 076 return o; 077 } 078 catch (IOException ex) { 079 throw new RuntimeException(ex); 080 } 081 } 082 083 private static final String NULL = "||"; 084 085 /** 086 * Write a string to a data output supporting <code>null</code> values. <p> It uses the '||' token to represent 087 * <code>null</code>. 088 * 089 * @param dataOutput data output. 090 * @param str string to write. 091 * @throws IOException thrown if the string could not be written. 092 */ 093 public static void writeStr(DataOutput dataOutput, String str) throws IOException { 094 str = (str != null) ? str : NULL; 095 dataOutput.writeUTF(str); 096 } 097 098 /** 099 * Read a string from a data input supporting <code>null</code> values. <p> It uses the '||' token to represent 100 * <code>null</code>. 101 * 102 * @param dataInput data input. 103 * @return read string, <code>null</code> if the '||' token was read. 104 * @throws IOException thrown if the string could not be read. 105 */ 106 public static String readStr(DataInput dataInput) throws IOException { 107 String str = dataInput.readUTF(); 108 return (str.equals(NULL)) ? null : str; 109 } 110 111 /** 112 * Read list. 113 * 114 * @param <T> the generic type 115 * @param dataInput the data input 116 * @param clazz the clazz 117 * @return the list 118 * @throws IOException Signals that an I/O exception has occurred. 119 */ 120 public static <T extends Writable> List<T> readList(DataInput dataInput, Class<T> clazz) throws IOException { 121 List<T> a = new ArrayList<T>(); 122 int count = dataInput.readInt(); 123 for (int i = 0; i < count; i++) { 124 T o = (T) ReflectionUtils.newInstance(clazz, null); 125 o.readFields(dataInput); 126 a.add(o); 127 } 128 return a; 129 } 130 131 public static List<String> readStringList(DataInput dataInput) throws IOException { 132 List<String> a = new ArrayList<String>(); 133 int count = dataInput.readInt(); 134 for (int i = 0; i < count; i++) { 135 a.add(readBytesAsString(dataInput)); 136 } 137 return a; 138 } 139 140 /** 141 * Write list. 142 * 143 * @param <T> the generic type 144 * @param dataOutput the data output 145 * @param list the list 146 * @throws IOException Signals that an I/O exception has occurred. 147 */ 148 public static <T extends Writable> void writeList(DataOutput dataOutput, List<T> list) throws IOException { 149 dataOutput.writeInt(list.size()); 150 for (T t : list) { 151 t.write(dataOutput); 152 } 153 } 154 155 /** 156 * Write string list. 157 * 158 * @param dataOutput the data output 159 * @param list the list 160 * @throws IOException Signals that an I/O exception has occurred. 161 */ 162 public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException { 163 dataOutput.writeInt(list.size()); 164 for (String str : list) { 165 writeStringAsBytes(dataOutput, str); 166 } 167 } 168 169 /** 170 * Write map. 171 * 172 * @param <T> the generic type 173 * @param dataOutput the data output 174 * @param map the map 175 * @throws IOException Signals that an I/O exception has occurred. 176 */ 177 public static <T extends Writable> void writeMap(DataOutput dataOutput, Map<String, T> map) throws IOException { 178 dataOutput.writeInt(map.size()); 179 for (Entry<String, T> t : map.entrySet()) { 180 writeStringAsBytes(dataOutput, t.getKey()); 181 t.getValue().write(dataOutput); 182 } 183 } 184 185 /** 186 * Write map with list. 187 * 188 * @param <T> the generic type 189 * @param dataOutput the data output 190 * @param map the map 191 * @throws IOException Signals that an I/O exception has occurred. 192 */ 193 public static <T extends Writable> void writeMapWithList(DataOutput dataOutput, Map<String, List<T>> map) 194 throws IOException { 195 dataOutput.writeInt(map.size()); 196 for (Entry<String, List<T>> t : map.entrySet()) { 197 writeStringAsBytes(dataOutput, t.getKey()); 198 writeList(dataOutput, t.getValue()); 199 } 200 } 201 202 /** 203 * Read map. 204 * 205 * @param <T> the generic type 206 * @param dataInput the data input 207 * @param clazz the clazz 208 * @return the map 209 * @throws IOException Signals that an I/O exception has occurred. 210 */ 211 public static <T extends Writable> Map<String, T> readMap(DataInput dataInput, Class<T> clazz) throws IOException { 212 Map<String, T> map = new HashMap<String, T>(); 213 int count = dataInput.readInt(); 214 for (int i = 0; i < count; i++) { 215 String key = readBytesAsString(dataInput); 216 T value = (T) ReflectionUtils.newInstance(clazz, null); 217 value.readFields(dataInput); 218 map.put(key, value); 219 } 220 return map; 221 } 222 223 /** 224 * Read map with list. 225 * 226 * @param <T> the generic type 227 * @param dataInput the data input 228 * @param clazz the clazz 229 * @return the map 230 * @throws IOException Signals that an I/O exception has occurred. 231 */ 232 public static <T extends Writable> Map<String, List<T>> readMapWithList(DataInput dataInput, Class<T> clazz) 233 throws IOException { 234 Map<String, List<T>> map = new HashMap<String, List<T>>(); 235 int count = dataInput.readInt(); 236 for (int i = 0; i < count; i++) { 237 String key = readBytesAsString(dataInput); 238 map.put(key, readList(dataInput, clazz)); 239 } 240 return map; 241 } 242 243 public static void writeStringAsBytes(DataOutput dOut, String value) throws IOException { 244 byte[] data = value.getBytes(CodecFactory.UTF_8_ENCODING); 245 dOut.writeInt(data.length); 246 dOut.write(data); 247 } 248 249 public static String readBytesAsString(DataInput dIn) throws IOException { 250 int length = dIn.readInt(); 251 byte[] data = new byte[length]; 252 dIn.readFully(data); 253 return new String(data, CodecFactory.UTF_8_ENCODING); 254 } 255 256}