001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.BufferedReader;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.io.OutputStream;
025import java.io.OutputStreamWriter;
026import java.io.Writer;
027import java.math.BigInteger;
028import java.security.MessageDigest;
029import java.security.NoSuchAlgorithmException;
030import java.security.PrivilegedExceptionAction;
031import java.util.ArrayList;
032import java.util.Collection;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Properties;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.io.SequenceFile;
042import org.apache.hadoop.io.Text;
043import org.apache.hadoop.mapred.JobConf;
044import org.apache.hadoop.mapred.RunningJob;
045import org.apache.hadoop.mapred.Counters;
046import org.apache.hadoop.security.UserGroupInformation;
047import org.apache.oozie.client.OozieClient;
048import org.apache.oozie.client.WorkflowAction;
049import org.apache.oozie.service.HadoopAccessorException;
050import org.apache.oozie.service.HadoopAccessorService;
051import org.apache.oozie.service.Services;
052import org.apache.oozie.service.URIHandlerService;
053import org.apache.oozie.service.UserGroupInformationService;
054import org.apache.oozie.util.IOUtils;
055import org.apache.oozie.util.PropertiesUtils;
056
057public class LauncherMapperHelper {
058
059    public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
060            throws HadoopAccessorException, IOException {
061        String jobId = null;
062        Path recoveryFile = new Path(actionDir, recoveryId);
063        FileSystem fs = Services.get().get(HadoopAccessorService.class)
064                .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
065
066        if (fs.exists(recoveryFile)) {
067            InputStream is = fs.open(recoveryFile);
068            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
069            jobId = reader.readLine();
070            reader.close();
071        }
072        return jobId;
073
074    }
075
076    public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
077        // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
078        // <configuration> property
079        if (javaMainClass != null && !javaMainClass.equals("")) {
080            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
081        }
082    }
083
084    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
085        for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
086            launcherConf.set(entry.getKey(), entry.getValue());
087        }
088    }
089
090    public static void setupMainArguments(Configuration launcherConf, String[] args) {
091        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
092        for (int i = 0; i < args.length; i++) {
093            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
094        }
095    }
096
097    public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
098        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
099    }
100
101    /**
102     * Set the maximum value of stats data
103     *
104     * @param launcherConf the oozie launcher configuration
105     * @param maxStatsData the maximum allowed size of stats data
106     */
107    public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
108        launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
109    }
110
111    /**
112     * Set the maximum number of globbed files/dirs
113     *
114     * @param launcherConf the oozie launcher configuration
115     * @param fsGlobMax the maximum number of files/dirs for FS operation
116     */
117    public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
118        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
119    }
120
121    public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
122            String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
123
124        launcherConf.setMapperClass(LauncherMapper.class);
125        launcherConf.setSpeculativeExecution(false);
126        launcherConf.setNumMapTasks(1);
127        launcherConf.setNumReduceTasks(0);
128
129        launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
130        launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
131        launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
132        launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
133        launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
134
135        actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
136        actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
137
138        if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
139          List<String> purgedEntries = new ArrayList<String>();
140          Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
141          for (String entry : entries) {
142            if (entry.contains("#")) {
143              purgedEntries.add(entry);
144            }
145          }
146          actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
147          launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
148        }
149
150        FileSystem fs =
151          Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
152                                                                           actionDir.toUri(), launcherConf);
153        fs.mkdirs(actionDir);
154
155        OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
156        try {
157            actionConf.writeXml(os);
158        } finally {
159            IOUtils.closeSafely(os);
160        }
161
162        launcherConf.setInputFormat(OozieLauncherInputFormat.class);
163        launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
164    }
165
166    public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String actionId)
167            throws NoSuchAlgorithmException {
168        launcherJobConf.setLong("oozie.job.launch.time", System.currentTimeMillis());
169        // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
170        String tag = getTag(actionId);
171        actionConf.set("mapreduce.job.tags", tag);
172    }
173
174    private static String getTag(String actionId) throws NoSuchAlgorithmException {
175        MessageDigest digest = MessageDigest.getInstance("MD5");
176        digest.update(actionId.getBytes(), 0, actionId.length());
177        String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
178        return md5;
179    }
180
181    public static boolean isMainDone(RunningJob runningJob) throws IOException {
182        return runningJob.isComplete();
183    }
184
185    public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
186        boolean succeeded = runningJob.isSuccessful();
187        if (succeeded) {
188            Counters counters = runningJob.getCounters();
189            if (counters != null) {
190                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
191                if (group != null) {
192                    succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
193                }
194            }
195        }
196        return succeeded;
197    }
198
199    /**
200     * Determine whether action has external child jobs or not
201     * @param actionData
202     * @return true/false
203     * @throws IOException
204     */
205    public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
206        return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
207    }
208
209    /**
210     * Determine whether action has output data or not
211     * @param actionData
212     * @return true/false
213     * @throws IOException
214     */
215    public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
216        return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
217    }
218
219    /**
220     * Determine whether action has external stats or not
221     * @param actionData
222     * @return true/false
223     * @throws IOException
224     */
225    public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
226        return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
227    }
228
229    /**
230     * Determine whether action has new id (id swap) or not
231     * @param actionData
232     * @return true/false
233     * @throws IOException
234     */
235    public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
236        return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
237    }
238
239    /**
240     * Get the sequence file path storing all action data
241     * @param actionDir
242     * @return
243     */
244    public static Path getActionDataSequenceFilePath(Path actionDir) {
245        return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
246    }
247
248    /**
249     * Utility function to load the contents of action data sequence file into
250     * memory object
251     *
252     * @param fs Action Filesystem
253     * @param actionDir Path
254     * @param conf Configuration
255     * @return Map action data
256     * @throws IOException
257     * @throws InterruptedException
258     */
259    public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
260            throws IOException, InterruptedException {
261        UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
262        UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
263
264        return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
265            @Override
266            public Map<String, String> run() throws IOException {
267                Map<String, String> ret = new HashMap<String, String>();
268                Path seqFilePath = getActionDataSequenceFilePath(actionDir);
269                if (fs.exists(seqFilePath)) {
270                    SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
271                    Text key = new Text(), value = new Text();
272                    while (seqFile.next(key, value)) {
273                        ret.put(key.toString(), value.toString());
274                    }
275                    seqFile.close();
276                }
277                else { // maintain backward-compatibility. to be deprecated
278                    org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
279                    InputStream is;
280                    BufferedReader reader = null;
281                    Properties props;
282                    if (files != null && files.length > 0) {
283                        for (int x = 0; x < files.length; x++) {
284                            Path file = files[x].getPath();
285                            if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
286                                is = fs.open(file);
287                                reader = new BufferedReader(new InputStreamReader(is));
288                                ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
289                                        IOUtils.getReaderAsString(reader, -1));
290                            }
291                            else if (file.equals(new Path(actionDir, "newId.properties"))) {
292                                is = fs.open(file);
293                                reader = new BufferedReader(new InputStreamReader(is));
294                                props = PropertiesUtils.readProperties(reader, -1);
295                                ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
296                            }
297                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
298                                int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
299                                        2 * 1024);
300                                is = fs.open(file);
301                                reader = new BufferedReader(new InputStreamReader(is));
302                                ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
303                                        .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
304                            }
305                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
306                                int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
307                                        Integer.MAX_VALUE);
308                                is = fs.open(file);
309                                reader = new BufferedReader(new InputStreamReader(is));
310                                ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
311                                        .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
312                            }
313                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
314                                is = fs.open(file);
315                                reader = new BufferedReader(new InputStreamReader(is));
316                                ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
317                            }
318                        }
319                    }
320                }
321                return ret;
322            }
323        });
324    }
325}