001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.BufferedReader;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.io.OutputStream;
025    import java.io.OutputStreamWriter;
026    import java.io.Writer;
027    import java.util.ArrayList;
028    import java.util.Collection;
029    import java.util.List;
030    import java.util.Map;
031    
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.hadoop.mapred.Counters;
036    import org.apache.hadoop.mapred.JobConf;
037    import org.apache.hadoop.mapred.RunningJob;
038    import org.apache.oozie.service.HadoopAccessorException;
039    import org.apache.oozie.service.HadoopAccessorService;
040    import org.apache.oozie.service.Services;
041    import org.apache.oozie.service.URIHandlerService;
042    import org.apache.oozie.util.XLog;
043    
044    public class LauncherMapperHelper {
045    
046        public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
047                throws HadoopAccessorException, IOException {
048            String jobId = null;
049            Path recoveryFile = new Path(actionDir, recoveryId);
050            FileSystem fs = Services.get().get(HadoopAccessorService.class)
051                    .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
052    
053            if (fs.exists(recoveryFile)) {
054                InputStream is = fs.open(recoveryFile);
055                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
056                jobId = reader.readLine();
057                reader.close();
058            }
059            return jobId;
060    
061        }
062    
063        public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
064            // Only set the javaMainClass if its not null or empty string (should be the case except for java action), this way the user
065            // can override the action's main class via <configuration> property
066            if (javaMainClass != null && !javaMainClass.equals("")) {
067                launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
068            }
069        }
070    
071        public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
072            for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
073                launcherConf.set(entry.getKey(), entry.getValue());
074            }
075        }
076    
077        public static void setupMainArguments(Configuration launcherConf, String[] args) {
078            launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
079            for (int i = 0; i < args.length; i++) {
080                launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
081            }
082        }
083    
084        public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
085            launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
086        }
087    
088        /**
089         * Set the maximum value of stats data
090         *
091         * @param launcherConf the oozie launcher configuration
092         * @param maxStatsData the maximum allowed size of stats data
093         */
094        public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
095            launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
096        }
097    
098        public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
099                String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
100    
101            launcherConf.setMapperClass(LauncherMapper.class);
102            launcherConf.setSpeculativeExecution(false);
103            launcherConf.setNumMapTasks(1);
104            launcherConf.setNumReduceTasks(0);
105    
106            launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
107            launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
108            launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
109            launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
110            launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
111    
112            actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
113            actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
114    
115            if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
116              List<String> purgedEntries = new ArrayList<String>();
117              Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
118              for (String entry : entries) {
119                if (entry.contains("#")) {
120                  purgedEntries.add(entry);
121                }
122              }
123              actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
124              launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
125            }
126    
127            FileSystem fs =
128              Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
129                                                                               actionDir.toUri(), launcherConf);
130            fs.mkdirs(actionDir);
131    
132            OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
133            actionConf.writeXml(os);
134            os.close();
135    
136            Path inputDir = new Path(actionDir, "input");
137            fs.mkdirs(inputDir);
138            Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "dummy.txt")));
139            writer.write("dummy");
140            writer.close();
141    
142            launcherConf.set("mapred.input.dir", inputDir.toString());
143            launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
144        }
145    
146        public static boolean isMainDone(RunningJob runningJob) throws IOException {
147            return runningJob.isComplete();
148        }
149    
150        public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
151            boolean succeeded = runningJob.isSuccessful();
152            if (succeeded) {
153                Counters counters = runningJob.getCounters();
154                if (counters != null) {
155                    Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
156                    if (group != null) {
157                        succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
158                    }
159                }
160            }
161            return succeeded;
162        }
163    
164        public static boolean hasOutputData(RunningJob runningJob) throws IOException {
165            boolean output = false;
166            Counters counters = runningJob.getCounters();
167            if (counters != null) {
168                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
169                if (group != null) {
170                    output = group.getCounter(LauncherMapper.COUNTER_OUTPUT_DATA) == 1;
171                }
172            }
173            return output;
174        }
175    
176        /**
177         * Check whether runningJob has stats data or not
178         *
179         * @param runningJob the runningJob
180         * @return returns whether the running Job has stats data or not
181         * @throws IOException
182         */
183        public static boolean hasStatsData(RunningJob runningJob) throws IOException{
184            boolean output = false;
185            Counters counters = runningJob.getCounters();
186            if (counters != null) {
187                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
188                if (group != null) {
189                    output = group.getCounter(LauncherMapper.COUNTER_STATS_DATA) == 1;
190                }
191            }
192            return output;
193        }
194    
195        public static boolean hasIdSwap(RunningJob runningJob) throws IOException {
196            boolean swap = false;
197            Counters counters = runningJob.getCounters();
198            if (counters != null) {
199                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
200                if (group != null) {
201                    swap = group.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
202                }
203            }
204            return swap;
205        }
206    
207        public static boolean hasIdSwap(RunningJob runningJob, String user, String group, Path actionDir)
208                throws IOException, HadoopAccessorException {
209            boolean swap = false;
210    
211            XLog log = XLog.getLog("org.apache.oozie.action.hadoop.LauncherMapper");
212    
213            Counters counters = runningJob.getCounters();
214            if (counters != null) {
215                Counters.Group counterGroup = counters.getGroup(LauncherMapper.COUNTER_GROUP);
216                if (counterGroup != null) {
217                    swap = counterGroup.getCounter(LauncherMapper.COUNTER_DO_ID_SWAP) == 1;
218                }
219            }
220            // additional check for swapped hadoop ID
221            // Can't rely on hadoop counters existing
222            // we'll check for the newID file in hdfs if the hadoop counters is null
223            else {
224    
225                Path p = getIdSwapPath(actionDir);
226                // log.debug("Checking for newId file in: [{0}]", p);
227    
228                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
229                Configuration conf = has.createJobConf(p.toUri().getAuthority());
230                FileSystem fs = has.createFileSystem(user, p.toUri(), conf);
231                if (fs.exists(p)) {
232                    log.debug("Hadoop Counters is null, but found newID file.");
233    
234                    swap = true;
235                }
236                else {
237                    log.debug("Hadoop Counters is null, and newID file doesn't exist at: [{0}]", p);
238                }
239            }
240            return swap;
241        }
242    
243        public static Path getOutputDataPath(Path actionDir) {
244            return new Path(actionDir, LauncherMapper.ACTION_OUTPUT_PROPS);
245        }
246    
247        /**
248         * Get the location of stats file
249         *
250         * @param actionDir the action directory
251         * @return the hdfs location of the file
252         */
253        public static Path getActionStatsDataPath(Path actionDir){
254            return new Path(actionDir, LauncherMapper.ACTION_STATS_PROPS);
255        }
256    
257        /**
258         * Get the location of external Child IDs file
259         *
260         * @param actionDir the action directory
261         * @return the hdfs location of the file
262         */
263        public static Path getExternalChildIDsDataPath(Path actionDir){
264            return new Path(actionDir, LauncherMapper.ACTION_EXTERNAL_CHILD_IDS_PROPS);
265        }
266    
267        public static Path getErrorPath(Path actionDir) {
268            return new Path(actionDir, LauncherMapper.ACTION_ERROR_PROPS);
269        }
270    
271        public static Path getIdSwapPath(Path actionDir) {
272            return new Path(actionDir, LauncherMapper.ACTION_NEW_ID_PROPS);
273        }
274    }