001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.OutputStream;
026import java.math.BigInteger;
027import java.security.MessageDigest;
028import java.security.NoSuchAlgorithmException;
029import java.security.PrivilegedExceptionAction;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Properties;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.io.SequenceFile;
041import org.apache.hadoop.io.Text;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapred.RunningJob;
044import org.apache.hadoop.mapred.Counters;
045import org.apache.hadoop.security.UserGroupInformation;
046import org.apache.oozie.client.OozieClient;
047import org.apache.oozie.client.WorkflowAction;
048import org.apache.oozie.service.HadoopAccessorException;
049import org.apache.oozie.service.HadoopAccessorService;
050import org.apache.oozie.service.Services;
051import org.apache.oozie.service.URIHandlerService;
052import org.apache.oozie.service.UserGroupInformationService;
053import org.apache.oozie.util.IOUtils;
054import org.apache.oozie.util.PropertiesUtils;
055
056public class LauncherMapperHelper {
057
058    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
059
060    public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
061            throws HadoopAccessorException, IOException {
062        String jobId = null;
063        Path recoveryFile = new Path(actionDir, recoveryId);
064        FileSystem fs = Services.get().get(HadoopAccessorService.class)
065                .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
066
067        if (fs.exists(recoveryFile)) {
068            InputStream is = fs.open(recoveryFile);
069            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
070            jobId = reader.readLine();
071            reader.close();
072        }
073        return jobId;
074
075    }
076
077    public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
078        // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
079        // <configuration> property
080        if (javaMainClass != null && !javaMainClass.equals("")) {
081            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
082        }
083    }
084
085    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
086        for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
087            launcherConf.set(entry.getKey(), entry.getValue());
088        }
089    }
090
091    public static void setupMainArguments(Configuration launcherConf, String[] args) {
092        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
093        for (int i = 0; i < args.length; i++) {
094            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
095        }
096    }
097
098    public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
099        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
100    }
101
102    /**
103     * Set the maximum value of stats data
104     *
105     * @param launcherConf the oozie launcher configuration
106     * @param maxStatsData the maximum allowed size of stats data
107     */
108    public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
109        launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
110    }
111
112    /**
113     * Set the maximum number of globbed files/dirs
114     *
115     * @param launcherConf the oozie launcher configuration
116     * @param fsGlobMax the maximum number of files/dirs for FS operation
117     */
118    public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
119        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
120    }
121
122    public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
123            String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
124
125        launcherConf.setMapperClass(LauncherMapper.class);
126        launcherConf.setSpeculativeExecution(false);
127        launcherConf.setNumMapTasks(1);
128        launcherConf.setNumReduceTasks(0);
129
130        launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
131        launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
132        launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
133        launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
134        launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
135
136        actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
137        actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
138
139        if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
140          List<String> purgedEntries = new ArrayList<String>();
141          Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
142          for (String entry : entries) {
143            if (entry.contains("#")) {
144              purgedEntries.add(entry);
145            }
146          }
147          actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
148          launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
149        }
150
151        FileSystem fs =
152          Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
153                                                                           actionDir.toUri(), launcherConf);
154        fs.mkdirs(actionDir);
155
156        OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
157        try {
158            actionConf.writeXml(os);
159        } finally {
160            IOUtils.closeSafely(os);
161        }
162
163        launcherConf.setInputFormat(OozieLauncherInputFormat.class);
164        launcherConf.setOutputFormat(OozieLauncherOutputFormat.class);
165        launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class);
166    }
167
168    public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String launcherTag,
169                                                long launcherTime)
170            throws NoSuchAlgorithmException {
171        launcherJobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, launcherTime);
172        // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
173        String tag = getTag(launcherTag);
174        // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
175        // mapreduce.job.tags should only go to child job launch by launcher.
176        actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag);
177    }
178
179    public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
180        MessageDigest digest = MessageDigest.getInstance("MD5");
181        digest.update(launcherTag.getBytes(), 0, launcherTag.length());
182        String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
183        return md5;
184    }
185
186    public static boolean isMainDone(RunningJob runningJob) throws IOException {
187        return runningJob.isComplete();
188    }
189
190    public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
191        boolean succeeded = runningJob.isSuccessful();
192        if (succeeded) {
193            Counters counters = runningJob.getCounters();
194            if (counters != null) {
195                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
196                if (group != null) {
197                    succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
198                }
199            }
200        }
201        return succeeded;
202    }
203
204    /**
205     * Determine whether action has external child jobs or not
206     * @param actionData
207     * @return true/false
208     * @throws IOException
209     */
210    public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
211        return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
212    }
213
214    /**
215     * Determine whether action has output data or not
216     * @param actionData
217     * @return true/false
218     * @throws IOException
219     */
220    public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
221        return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
222    }
223
224    /**
225     * Determine whether action has external stats or not
226     * @param actionData
227     * @return true/false
228     * @throws IOException
229     */
230    public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
231        return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
232    }
233
234    /**
235     * Determine whether action has new id (id swap) or not
236     * @param actionData
237     * @return true/false
238     * @throws IOException
239     */
240    public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
241        return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
242    }
243
244    /**
245     * Get the sequence file path storing all action data
246     * @param actionDir
247     * @return
248     */
249    public static Path getActionDataSequenceFilePath(Path actionDir) {
250        return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
251    }
252
253    /**
254     * Utility function to load the contents of action data sequence file into
255     * memory object
256     *
257     * @param fs Action Filesystem
258     * @param actionDir Path
259     * @param conf Configuration
260     * @return Map action data
261     * @throws IOException
262     * @throws InterruptedException
263     */
264    public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
265            throws IOException, InterruptedException {
266        UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
267        UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
268
269        return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
270            @Override
271            public Map<String, String> run() throws IOException {
272                Map<String, String> ret = new HashMap<String, String>();
273                Path seqFilePath = getActionDataSequenceFilePath(actionDir);
274                if (fs.exists(seqFilePath)) {
275                    SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
276                    Text key = new Text(), value = new Text();
277                    while (seqFile.next(key, value)) {
278                        ret.put(key.toString(), value.toString());
279                    }
280                    seqFile.close();
281                }
282                else { // maintain backward-compatibility. to be deprecated
283                    org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
284                    InputStream is;
285                    BufferedReader reader = null;
286                    Properties props;
287                    if (files != null && files.length > 0) {
288                        for (int x = 0; x < files.length; x++) {
289                            Path file = files[x].getPath();
290                            if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
291                                is = fs.open(file);
292                                reader = new BufferedReader(new InputStreamReader(is));
293                                ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
294                                        IOUtils.getReaderAsString(reader, -1));
295                            }
296                            else if (file.equals(new Path(actionDir, "newId.properties"))) {
297                                is = fs.open(file);
298                                reader = new BufferedReader(new InputStreamReader(is));
299                                props = PropertiesUtils.readProperties(reader, -1);
300                                ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
301                            }
302                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
303                                int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
304                                        2 * 1024);
305                                is = fs.open(file);
306                                reader = new BufferedReader(new InputStreamReader(is));
307                                ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
308                                        .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
309                            }
310                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
311                                int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
312                                        Integer.MAX_VALUE);
313                                is = fs.open(file);
314                                reader = new BufferedReader(new InputStreamReader(is));
315                                ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
316                                        .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
317                            }
318                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
319                                is = fs.open(file);
320                                reader = new BufferedReader(new InputStreamReader(is));
321                                ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
322                            }
323                        }
324                    }
325                }
326                return ret;
327            }
328        });
329    }
330
331    public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
332        String tag;
333        if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
334            tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
335        } else if (parentId != null) {
336            tag = parentId + "@" + wfAction.getName();
337        } else {
338            tag = wfAction.getId();
339        }
340        return tag;
341    }
342
343}