001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import org.apache.hadoop.io.Writable;
022import org.apache.hadoop.util.ReflectionUtils;
023import org.apache.oozie.compression.CodecFactory;
024
025import java.io.ByteArrayInputStream;
026import java.io.ByteArrayOutputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.io.DataOutput;
031import java.io.DataInput;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037
038/**
039 * Utility class to write/read Hadoop writables to/from a byte array.
040 */
041public class WritableUtils {
042
043    public static XLog LOG = XLog.getLog(WritableUtils.class);
044
045    /**
046     * Write a writable to a byte array.
047     *
048     * @param writable writable to write.
049     * @return array containing the serialized writable.
050     */
051    public static byte[] toByteArray(Writable writable) {
052        try {
053            ByteArrayOutputStream baos = new ByteArrayOutputStream();
054            DataOutputStream daos = new DataOutputStream(baos);
055            writable.write(daos);
056            daos.close();
057            return baos.toByteArray();
058        }
059        catch (IOException ex) {
060            throw new RuntimeException(ex);
061        }
062    }
063
064    /**
065     * Read a writable from a byte array.
066     *
067     * @param <T> the object type
068     * @param array byte array with the serialized writable.
069     * @param clazz writable class.
070     * @return writable deserialized from the byte array.
071     */
072    public static <T extends Writable> T fromByteArray(byte[] array, Class<T> clazz) {
073        try {
074            T o = (T) ReflectionUtils.newInstance(clazz, null);
075            o.readFields(new DataInputStream(new ByteArrayInputStream(array)));
076            return o;
077        }
078        catch (IOException ex) {
079            throw new RuntimeException(ex);
080        }
081    }
082
083    private static final String NULL = "||";
084
085    /**
086     * Write a string to a data output supporting <code>null</code> values. <p> It uses the '||' token to represent
087     * <code>null</code>.
088     *
089     * @param dataOutput data output.
090     * @param str string to write.
091     * @throws IOException thrown if the string could not be written.
092     */
093    public static void writeStr(DataOutput dataOutput, String str) throws IOException {
094        str = (str != null) ? str : NULL;
095        dataOutput.writeUTF(str);
096    }
097
098    /**
099     * Read a string from a data input supporting <code>null</code> values. <p> It uses the '||' token to represent
100     * <code>null</code>.
101     *
102     * @param dataInput data input.
103     * @return read string, <code>null</code> if the '||' token was read.
104     * @throws IOException thrown if the string could not be read.
105     */
106    public static String readStr(DataInput dataInput) throws IOException {
107        String str = dataInput.readUTF();
108        return (str.equals(NULL)) ? null : str;
109    }
110
111    /**
112     * Read list.
113     *
114     * @param <T> the generic type
115     * @param dataInput the data input
116     * @param clazz the clazz
117     * @return the list
118     * @throws IOException Signals that an I/O exception has occurred.
119     */
120    public static <T extends Writable> List<T> readList(DataInput dataInput, Class<T> clazz) throws IOException {
121        List<T> a = new ArrayList<T>();
122        int count = dataInput.readInt();
123        for (int i = 0; i < count; i++) {
124            T o = (T) ReflectionUtils.newInstance(clazz, null);
125            o.readFields(dataInput);
126            a.add(o);
127        }
128        return a;
129    }
130
131    public static List<String> readStringList(DataInput dataInput) throws IOException {
132        List<String> a = new ArrayList<String>();
133        int count = dataInput.readInt();
134        for (int i = 0; i < count; i++) {
135            a.add(readBytesAsString(dataInput));
136        }
137        return a;
138    }
139
140    /**
141     * Write list.
142     *
143     * @param <T> the generic type
144     * @param dataOutput the data output
145     * @param list the list
146     * @throws IOException Signals that an I/O exception has occurred.
147     */
148    public static <T extends Writable> void writeList(DataOutput dataOutput, List<T> list) throws IOException {
149        dataOutput.writeInt(list.size());
150        for (T t : list) {
151            t.write(dataOutput);
152        }
153    }
154
155    /**
156     * Write string list.
157     *
158     * @param dataOutput the data output
159     * @param list the list
160     * @throws IOException Signals that an I/O exception has occurred.
161     */
162    public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException {
163        dataOutput.writeInt(list.size());
164        for (String str : list) {
165            writeStringAsBytes(dataOutput, str);
166        }
167    }
168
169    /**
170     * Write map.
171     *
172     * @param <T> the generic type
173     * @param dataOutput the data output
174     * @param map the map
175     * @throws IOException Signals that an I/O exception has occurred.
176     */
177    public static <T extends Writable> void writeMap(DataOutput dataOutput, Map<String, T> map) throws IOException {
178        dataOutput.writeInt(map.size());
179        for (Entry<String, T> t : map.entrySet()) {
180            writeStringAsBytes(dataOutput, t.getKey());
181            t.getValue().write(dataOutput);
182        }
183    }
184
185    /**
186     * Write map with list.
187     *
188     * @param <T> the generic type
189     * @param dataOutput the data output
190     * @param map the map
191     * @throws IOException Signals that an I/O exception has occurred.
192     */
193    public static <T extends Writable> void writeMapWithList(DataOutput dataOutput, Map<String, List<T>> map)
194            throws IOException {
195        dataOutput.writeInt(map.size());
196        for (Entry<String, List<T>> t : map.entrySet()) {
197            writeStringAsBytes(dataOutput, t.getKey());
198            writeList(dataOutput, t.getValue());
199        }
200    }
201
202    /**
203     * Read map.
204     *
205     * @param <T> the generic type
206     * @param dataInput the data input
207     * @param clazz the clazz
208     * @return the map
209     * @throws IOException Signals that an I/O exception has occurred.
210     */
211    public static <T extends Writable> Map<String, T> readMap(DataInput dataInput, Class<T> clazz) throws IOException {
212        Map<String, T> map = new HashMap<String, T>();
213        int count = dataInput.readInt();
214        for (int i = 0; i < count; i++) {
215            String key = readBytesAsString(dataInput);
216            T value = (T) ReflectionUtils.newInstance(clazz, null);
217            value.readFields(dataInput);
218            map.put(key, value);
219        }
220        return map;
221    }
222
223    /**
224     * Read map with list.
225     *
226     * @param <T> the generic type
227     * @param dataInput the data input
228     * @param clazz the clazz
229     * @return the map
230     * @throws IOException Signals that an I/O exception has occurred.
231     */
232    public static <T extends Writable> Map<String, List<T>> readMapWithList(DataInput dataInput, Class<T> clazz)
233            throws IOException {
234        Map<String, List<T>> map = new HashMap<String, List<T>>();
235        int count = dataInput.readInt();
236        for (int i = 0; i < count; i++) {
237            String key = readBytesAsString(dataInput);
238            map.put(key, readList(dataInput, clazz));
239        }
240        return map;
241    }
242
243    public static void writeStringAsBytes(DataOutput dOut, String value) throws IOException {
244        byte[] data = value.getBytes(CodecFactory.UTF_8_ENCODING);
245        dOut.writeInt(data.length);
246        dOut.write(data);
247    }
248
249    public static String readBytesAsString(DataInput dIn) throws IOException {
250        int length = dIn.readInt();
251        byte[] data = new byte[length];
252        dIn.readFully(data);
253        return new String(data, CodecFactory.UTF_8_ENCODING);
254    }
255
256}