001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import org.apache.oozie.util.XLogStreamer;
021    import org.apache.oozie.service.XLogService;
022    import org.apache.oozie.service.DagXLogInfoService;
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.client.CoordinatorJob;
025    import org.apache.oozie.client.WorkflowJob;
026    import org.apache.oozie.client.OozieClient;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.command.wf.CompletedActionXCommand;
029    import org.apache.oozie.command.wf.DefinitionXCommand;
030    import org.apache.oozie.command.wf.ExternalIdXCommand;
031    import org.apache.oozie.command.wf.JobXCommand;
032    import org.apache.oozie.command.wf.JobsXCommand;
033    import org.apache.oozie.command.wf.KillXCommand;
034    import org.apache.oozie.command.wf.ReRunXCommand;
035    import org.apache.oozie.command.wf.ResumeXCommand;
036    import org.apache.oozie.command.wf.StartXCommand;
037    import org.apache.oozie.command.wf.SubmitHiveXCommand;
038    import org.apache.oozie.command.wf.SubmitHttpXCommand;
039    import org.apache.oozie.command.wf.SubmitMRXCommand;
040    import org.apache.oozie.command.wf.SubmitPigXCommand;
041    import org.apache.oozie.command.wf.SubmitXCommand;
042    import org.apache.oozie.command.wf.SuspendXCommand;
043    import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
044    import org.apache.oozie.service.Services;
045    import org.apache.oozie.service.CallableQueueService;
046    import org.apache.oozie.util.ParamChecker;
047    import org.apache.oozie.util.XCallable;
048    import org.apache.oozie.util.XLog;
049    
050    import java.io.Writer;
051    import java.util.Date;
052    import java.util.List;
053    import java.util.Properties;
054    import java.util.Set;
055    import java.util.HashSet;
056    import java.util.StringTokenizer;
057    import java.util.Map;
058    import java.util.HashMap;
059    import java.util.ArrayList;
060    import java.io.IOException;
061    
062    /**
063     * The DagEngine provides all the DAG engine functionality for WS calls.
064     */
065    public class DagEngine extends BaseEngine {
066    
067        private static final int HIGH_PRIORITY = 2;
068        private static XLog LOG = XLog.getLog(DagEngine.class);
069    
070        /**
071         * Create a system Dag engine, with no user and no group.
072         */
073        public DagEngine() {
074            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
075                LOG.debug("Oozie DagEngine is not using XCommands.");
076            }
077            else {
078                LOG.debug("Oozie DagEngine is using XCommands.");
079            }
080        }
081    
082        /**
083         * Create a Dag engine to perform operations on behave of a user.
084         *
085         * @param user user name.
086         */
087        public DagEngine(String user) {
088            this();
089    
090            this.user = ParamChecker.notEmpty(user, "user");
091        }
092    
093        /**
094         * Submit a workflow job. <p/> It validates configuration properties.
095         *
096         * @param conf job configuration.
097         * @param startJob indicates if the job should be started or not.
098         * @return the job Id.
099         * @throws DagEngineException thrown if the job could not be created.
100         */
101        @Override
102        public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
103            validateSubmitConfiguration(conf);
104    
105            try {
106                String jobId;
107                SubmitXCommand submit = new SubmitXCommand(conf);
108                jobId = submit.call();
109                if (startJob) {
110                    start(jobId);
111                }
112                return jobId;
113            }
114            catch (CommandException ex) {
115                throw new DagEngineException(ex);
116            }
117        }
118    
119        /**
120         * Submit a workflow through a coordinator. It validates configuration properties.
121         * @param conf job conf
122         * @param parentId parent of workflow
123         * @return
124         * @throws DagEngineException
125         */
126        public String submitJobFromCoordinator(Configuration conf, String parentId) throws DagEngineException {
127            validateSubmitConfiguration(conf);
128            try {
129                String jobId;
130                SubmitXCommand submit = new SubmitXCommand(conf, parentId);
131                jobId = submit.call();
132                start(jobId);
133                return jobId;
134            }
135            catch (CommandException ex) {
136                throw new DagEngineException(ex);
137            }
138        }
139    
140        /**
141         * Submit a pig/hive/mapreduce job through HTTP.
142         * <p/>
143         * It validates configuration properties.
144         *
145         * @param conf job configuration.
146         * @param jobType job type - can be "pig", "hive, or "mapreduce".
147         * @return the job Id.
148         * @throws DagEngineException thrown if the job could not be created.
149         */
150        public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException {
151            validateSubmitConfiguration(conf);
152    
153            try {
154                String jobId;
155                SubmitHttpXCommand submit = null;
156                if (jobType.equals("pig")) {
157                    submit = new SubmitPigXCommand(conf);
158                }
159                else if (jobType.equals("mapreduce")) {
160                    submit = new SubmitMRXCommand(conf);
161                }
162                else if (jobType.equals("hive")) {
163                    submit = new SubmitHiveXCommand(conf);
164                }
165    
166                jobId = submit.call();
167                start(jobId);
168                return jobId;
169            }
170            catch (CommandException ex) {
171                throw new DagEngineException(ex);
172            }
173        }
174    
175        private void validateSubmitConfiguration(Configuration conf) throws DagEngineException {
176            if (conf.get(OozieClient.APP_PATH) == null) {
177                throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
178            }
179        }
180    
181        /**
182         * Start a job.
183         *
184         * @param jobId job Id.
185         * @throws DagEngineException thrown if the job could not be started.
186         */
187        @Override
188        public void start(String jobId) throws DagEngineException {
189            // Changing to synchronous call from asynchronous queuing to prevent the
190            // loss of command if the queue is full or the queue is lost in case of
191            // failure.
192            try {
193                    new StartXCommand(jobId).call();
194            }
195            catch (CommandException e) {
196                throw new DagEngineException(e);
197            }
198        }
199    
200        /**
201         * Resume a job.
202         *
203         * @param jobId job Id.
204         * @throws DagEngineException thrown if the job could not be resumed.
205         */
206        @Override
207        public void resume(String jobId) throws DagEngineException {
208            // Changing to synchronous call from asynchronous queuing to prevent the
209            // loss of command if the queue is full or the queue is lost in case of
210            // failure.
211            try {
212                    new ResumeXCommand(jobId).call();
213            }
214            catch (CommandException e) {
215                throw new DagEngineException(e);
216            }
217        }
218    
219        /**
220         * Suspend a job.
221         *
222         * @param jobId job Id.
223         * @throws DagEngineException thrown if the job could not be suspended.
224         */
225        @Override
226        public void suspend(String jobId) throws DagEngineException {
227            // Changing to synchronous call from asynchronous queuing to prevent the
228            // loss of command if the queue is full or the queue is lost in case of
229            // failure.
230            try {
231                            new SuspendXCommand(jobId).call();
232            }
233            catch (CommandException e) {
234                throw new DagEngineException(e);
235            }
236        }
237    
238        /**
239         * Kill a job.
240         *
241         * @param jobId job Id.
242         * @throws DagEngineException thrown if the job could not be killed.
243         */
244        @Override
245        public void kill(String jobId) throws DagEngineException {
246            // Changing to synchronous call from asynchronous queuing to prevent the
247            // loss of command if the queue is full or the queue is lost in case of
248            // failure.
249            try {
250                            new KillXCommand(jobId).call();
251                            LOG.info("User " + user + " killed the WF job " + jobId);
252            }
253            catch (CommandException e) {
254                throw new DagEngineException(e);
255            }
256        }
257    
258        /* (non-Javadoc)
259         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
260         */
261        @Override
262        public void change(String jobId, String changeValue) throws DagEngineException {
263            // This code should not be reached.
264            throw new DagEngineException(ErrorCode.E1017);
265        }
266    
267        /**
268         * Rerun a job.
269         *
270         * @param jobId job Id to rerun.
271         * @param conf configuration information for the rerun.
272         * @throws DagEngineException thrown if the job could not be rerun.
273         */
274        @Override
275        public void reRun(String jobId, Configuration conf) throws DagEngineException {
276            try {
277                validateReRunConfiguration(conf);
278                new ReRunXCommand(jobId, conf).call();
279                start(jobId);
280            }
281            catch (CommandException ex) {
282                throw new DagEngineException(ex);
283            }
284        }
285    
286        private void validateReRunConfiguration(Configuration conf) throws DagEngineException {
287            if (conf.get(OozieClient.APP_PATH) == null) {
288                throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
289            }
290            if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) {
291                throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR "
292                        + OozieClient.RERUN_FAIL_NODES);
293            }
294            if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) {
295                throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR "
296                        + OozieClient.RERUN_FAIL_NODES);
297            }
298        }
299    
300        /**
301         * Process an action callback.
302         *
303         * @param actionId the action Id.
304         * @param externalStatus the action external status.
305         * @param actionData the action output data, <code>null</code> if none.
306         * @throws DagEngineException thrown if the callback could not be processed.
307         */
308        public void processCallback(String actionId, String externalStatus, Properties actionData)
309                throws DagEngineException {
310            XLog.Info.get().clearParameter(XLogService.GROUP);
311            XLog.Info.get().clearParameter(XLogService.USER);
312            XCallable<Void> command = null;
313    
314                    command = new CompletedActionXCommand(actionId, externalStatus,
315                                    actionData, HIGH_PRIORITY);
316            if (!Services.get().get(CallableQueueService.class).queue(command)) {
317                LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
318            }
319        }
320    
321        /**
322         * Return the info about a job.
323         *
324         * @param jobId job Id.
325         * @return the workflow job info.
326         * @throws DagEngineException thrown if the job info could not be obtained.
327         */
328        @Override
329        public WorkflowJob getJob(String jobId) throws DagEngineException {
330            try {
331                    return new JobXCommand(jobId).call();
332            }
333            catch (CommandException ex) {
334                throw new DagEngineException(ex);
335            }
336        }
337    
338        /**
339         * Return the info about a job with actions subset.
340         *
341         * @param jobId job Id
342         * @param start starting from this index in the list of actions belonging to the job
343         * @param length number of actions to be returned
344         * @return the workflow job info.
345         * @throws DagEngineException thrown if the job info could not be obtained.
346         */
347        @Override
348        public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException {
349            try {
350                            return new JobXCommand(jobId, start, length).call();
351            }
352            catch (CommandException ex) {
353                throw new DagEngineException(ex);
354            }
355        }
356    
357        /**
358         * Return the a job definition.
359         *
360         * @param jobId job Id.
361         * @return the job definition.
362         * @throws DagEngineException thrown if the job definition could no be obtained.
363         */
364        @Override
365        public String getDefinition(String jobId) throws DagEngineException {
366            try {
367                            return new DefinitionXCommand(jobId).call();
368            }
369            catch (CommandException ex) {
370                throw new DagEngineException(ex);
371            }
372        }
373    
374        /**
375         * Stream the log of a job.
376         *
377         * @param jobId job Id.
378         * @param writer writer to stream the log to.
379         * @throws IOException thrown if the log cannot be streamed.
380         * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
381         */
382        @Override
383        public void streamLog(String jobId, Writer writer) throws IOException, DagEngineException {
384            XLogStreamer.Filter filter = new XLogStreamer.Filter();
385            filter.setParameter(DagXLogInfoService.JOB, jobId);
386            WorkflowJob job = getJob(jobId);
387            Date lastTime = job.getEndTime();
388            if (lastTime == null) {
389                lastTime = job.getLastModifiedTime();
390            }
391            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer);
392        }
393    
394        private static final Set<String> FILTER_NAMES = new HashSet<String>();
395    
396        static {
397            FILTER_NAMES.add(OozieClient.FILTER_USER);
398            FILTER_NAMES.add(OozieClient.FILTER_NAME);
399            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
400            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
401            FILTER_NAMES.add(OozieClient.FILTER_ID);
402        }
403    
404        /**
405         * Validate a jobs filter.
406         *
407         * @param filter filter to validate.
408         * @return the parsed filter.
409         * @throws DagEngineException thrown if the filter is invalid.
410         */
411        protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException {
412            Map<String, List<String>> map = new HashMap<String, List<String>>();
413            if (filter != null) {
414                StringTokenizer st = new StringTokenizer(filter, ";");
415                while (st.hasMoreTokens()) {
416                    String token = st.nextToken();
417                    if (token.contains("=")) {
418                        String[] pair = token.split("=");
419                        if (pair.length != 2) {
420                            throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
421                        }
422                        if (!FILTER_NAMES.contains(pair[0])) {
423                            throw new DagEngineException(ErrorCode.E0420, filter, XLog
424                                    .format("invalid name [{0}]", pair[0]));
425                        }
426                        if (pair[0].equals("status")) {
427                            try {
428                                WorkflowJob.Status.valueOf(pair[1]);
429                            }
430                            catch (IllegalArgumentException ex) {
431                                throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]",
432                                                                                                  pair[1]));
433                            }
434                        }
435                        List<String> list = map.get(pair[0]);
436                        if (list == null) {
437                            list = new ArrayList<String>();
438                            map.put(pair[0], list);
439                        }
440                        list.add(pair[1]);
441                    }
442                    else {
443                        throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
444                    }
445                }
446            }
447            return map;
448        }
449    
450        /**
451         * Return the info about a set of jobs.
452         *
453         * @param filter job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax.
454         * @param start offset, base 1.
455         * @param len number of jobs to return.
456         * @return job info for all matching jobs, the jobs don't contain node action information.
457         * @throws DagEngineException thrown if the jobs info could not be obtained.
458         */
459        public WorkflowsInfo getJobs(String filter, int start, int len) throws DagEngineException {
460            Map<String, List<String>> filterList = parseFilter(filter);
461            try {
462                            return new JobsXCommand(filterList, start, len).call();
463            }
464            catch (CommandException dce) {
465                throw new DagEngineException(dce);
466            }
467        }
468    
469        /**
470         * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
471         *
472         * @param externalId external ID provided at job submission time.
473         * @return the associated workflow job ID if any, <code>null</code> if none.
474         * @throws DagEngineException thrown if the lookup could not be done.
475         */
476        @Override
477        public String getJobIdForExternalId(String externalId) throws DagEngineException {
478            try {
479                            return new ExternalIdXCommand(externalId).call();
480            }
481            catch (CommandException dce) {
482                throw new DagEngineException(dce);
483            }
484        }
485    
486        @Override
487        public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException {
488            throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
489        }
490    
491        @Override
492        public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) throws BaseEngineException {
493            throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
494        }
495    
496        public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException {
497            try {
498                            return new WorkflowActionInfoXCommand(actionId).call();
499            }
500            catch (CommandException ex) {
501                throw new BaseEngineException(ex);
502            }
503        }
504    
505        /* (non-Javadoc)
506         * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
507         */
508        @Override
509        public String dryRunSubmit(Configuration conf) throws BaseEngineException {
510            try {
511                SubmitXCommand submit = new SubmitXCommand(true, conf);
512                return submit.call();
513            } catch (CommandException ex) {
514                throw new DagEngineException(ex);
515            }
516        }
517    }