001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import org.apache.hadoop.io.Writable;
022import org.apache.hadoop.util.ReflectionUtils;
023import org.apache.oozie.compression.CodecFactory;
024
025import java.io.ByteArrayInputStream;
026import java.io.ByteArrayOutputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.io.DataOutput;
031import java.io.DataInput;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037
038/**
039 * Utility class to write/read Hadoop writables to/from a byte array.
040 */
041public class WritableUtils {
042
043    public static XLog LOG = XLog.getLog(WritableUtils.class);
044
045    /**
046     * Write a writable to a byte array.
047     *
048     * @param writable writable to write.
049     * @return array containing the serialized writable.
050     */
051    public static byte[] toByteArray(Writable writable) {
052        try {
053            ByteArrayOutputStream baos = new ByteArrayOutputStream();
054            DataOutputStream daos = new DataOutputStream(baos);
055            writable.write(daos);
056            daos.close();
057            return baos.toByteArray();
058        }
059        catch (IOException ex) {
060            throw new RuntimeException(ex);
061        }
062    }
063
064    /**
065     * Read a writable from a byte array.
066     *
067     * @param array byte array with the serialized writable.
068     * @param clazz writable class.
069     * @return writable deserialized from the byte array.
070     */
071    public static <T extends Writable> T fromByteArray(byte[] array, Class<T> clazz) {
072        try {
073            T o = (T) ReflectionUtils.newInstance(clazz, null);
074            o.readFields(new DataInputStream(new ByteArrayInputStream(array)));
075            return o;
076        }
077        catch (IOException ex) {
078            throw new RuntimeException(ex);
079        }
080    }
081
082    private static final String NULL = "||";
083
084    /**
085     * Write a string to a data output supporting <code>null</code> values. <p> It uses the '||' token to represent
086     * <code>null</code>.
087     *
088     * @param dataOutput data output.
089     * @param str string to write.
090     * @throws IOException thrown if the string could not be written.
091     */
092    public static void writeStr(DataOutput dataOutput, String str) throws IOException {
093        str = (str != null) ? str : NULL;
094        dataOutput.writeUTF(str);
095    }
096
097    /**
098     * Read a string from a data input supporting <code>null</code> values. <p> It uses the '||' token to represent
099     * <code>null</code>.
100     *
101     * @param dataInput data input.
102     * @return read string, <code>null</code> if the '||' token was read.
103     * @throws IOException thrown if the string could not be read.
104     */
105    public static String readStr(DataInput dataInput) throws IOException {
106        String str = dataInput.readUTF();
107        return (str.equals(NULL)) ? null : str;
108    }
109
110    /**
111     * Read list.
112     *
113     * @param <T> the generic type
114     * @param dataInput the data input
115     * @param clazz the clazz
116     * @return the list
117     * @throws IOException Signals that an I/O exception has occurred.
118     */
119    public static <T extends Writable> List<T> readList(DataInput dataInput, Class<T> clazz) throws IOException {
120        List<T> a = new ArrayList<T>();
121        int count = dataInput.readInt();
122        for (int i = 0; i < count; i++) {
123            T o = (T) ReflectionUtils.newInstance(clazz, null);
124            o.readFields(dataInput);
125            a.add(o);
126        }
127        return a;
128    }
129
130    public static List<String> readStringList(DataInput dataInput) throws IOException {
131        List<String> a = new ArrayList<String>();
132        int count = dataInput.readInt();
133        for (int i = 0; i < count; i++) {
134            a.add(readBytesAsString(dataInput));
135        }
136        return a;
137    }
138
139    /**
140     * Write list.
141     *
142     * @param <T> the generic type
143     * @param dataOutput the data output
144     * @param list the list
145     * @throws IOException Signals that an I/O exception has occurred.
146     */
147    public static <T extends Writable> void writeList(DataOutput dataOutput, List<T> list) throws IOException {
148        dataOutput.writeInt(list.size());
149        for (T t : list) {
150            t.write(dataOutput);
151        }
152    }
153
154    public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException {
155        dataOutput.writeInt(list.size());
156        for (String str : list) {
157            writeStringAsBytes(dataOutput, str);
158        }
159    }
160
161    /**
162     * Write map.
163     *
164     * @param <T> the generic type
165     * @param dataOutput the data output
166     * @param map the map
167     * @throws IOException Signals that an I/O exception has occurred.
168     */
169    public static <T extends Writable> void writeMap(DataOutput dataOutput, Map<String, T> map) throws IOException {
170        dataOutput.writeInt(map.size());
171        for (Entry<String, T> t : map.entrySet()) {
172            writeStringAsBytes(dataOutput, t.getKey());
173            t.getValue().write(dataOutput);
174        }
175    }
176
177    /**
178     * Write map with list.
179     *
180     * @param <T> the generic type
181     * @param dataOutput the data output
182     * @param map the map
183     * @throws IOException Signals that an I/O exception has occurred.
184     */
185    public static <T extends Writable> void writeMapWithList(DataOutput dataOutput, Map<String, List<T>> map)
186            throws IOException {
187        dataOutput.writeInt(map.size());
188        for (Entry<String, List<T>> t : map.entrySet()) {
189            writeStringAsBytes(dataOutput, t.getKey());
190            writeList(dataOutput, t.getValue());
191        }
192    }
193
194    /**
195     * Read map.
196     *
197     * @param <T> the generic type
198     * @param dataInput the data input
199     * @param clazz the clazz
200     * @return the map
201     * @throws IOException Signals that an I/O exception has occurred.
202     */
203    public static <T extends Writable> Map<String, T> readMap(DataInput dataInput, Class<T> clazz) throws IOException {
204        Map<String, T> map = new HashMap<String, T>();
205        int count = dataInput.readInt();
206        for (int i = 0; i < count; i++) {
207            String key = readBytesAsString(dataInput);
208            T value = (T) ReflectionUtils.newInstance(clazz, null);
209            value.readFields(dataInput);
210            map.put(key, value);
211        }
212        return map;
213    }
214
215    /**
216     * Read map with list.
217     *
218     * @param <T> the generic type
219     * @param dataInput the data input
220     * @param clazz the clazz
221     * @return the map
222     * @throws IOException Signals that an I/O exception has occurred.
223     */
224    public static <T extends Writable> Map<String, List<T>> readMapWithList(DataInput dataInput, Class<T> clazz)
225            throws IOException {
226        Map<String, List<T>> map = new HashMap<String, List<T>>();
227        int count = dataInput.readInt();
228        for (int i = 0; i < count; i++) {
229            String key = readBytesAsString(dataInput);
230            map.put(key, readList(dataInput, clazz));
231        }
232        return map;
233    }
234
235    public static void writeStringAsBytes(DataOutput dOut, String value) throws IOException {
236        byte[] data = value.getBytes(CodecFactory.UTF_8_ENCODING);
237        dOut.writeInt(data.length);
238        dOut.write(data);
239    }
240
241    public static String readBytesAsString(DataInput dIn) throws IOException {
242        int length = dIn.readInt();
243        byte[] data = new byte[length];
244        dIn.readFully(data);
245        return new String(data, CodecFactory.UTF_8_ENCODING);
246    }
247
248}