001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import org.apache.oozie.service.XLogService;
021import org.apache.oozie.service.DagXLogInfoService;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.client.CoordinatorJob;
024import org.apache.oozie.client.WorkflowJob;
025import org.apache.oozie.client.OozieClient;
026import org.apache.oozie.command.CommandException;
027import org.apache.oozie.command.wf.CompletedActionXCommand;
028import org.apache.oozie.command.wf.DefinitionXCommand;
029import org.apache.oozie.command.wf.ExternalIdXCommand;
030import org.apache.oozie.command.wf.JobXCommand;
031import org.apache.oozie.command.wf.JobsXCommand;
032import org.apache.oozie.command.wf.KillXCommand;
033import org.apache.oozie.command.wf.ReRunXCommand;
034import org.apache.oozie.command.wf.ResumeXCommand;
035import org.apache.oozie.command.wf.StartXCommand;
036import org.apache.oozie.command.wf.SubmitHiveXCommand;
037import org.apache.oozie.command.wf.SubmitHttpXCommand;
038import org.apache.oozie.command.wf.SubmitMRXCommand;
039import org.apache.oozie.command.wf.SubmitPigXCommand;
040import org.apache.oozie.command.wf.SubmitSqoopXCommand;
041import org.apache.oozie.command.wf.SubmitXCommand;
042import org.apache.oozie.command.wf.SuspendXCommand;
043import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
046import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
047import org.apache.oozie.service.Services;
048import org.apache.oozie.service.CallableQueueService;
049import org.apache.oozie.util.XLogFilter;
050import org.apache.oozie.util.XLogUserFilterParam;
051import org.apache.oozie.util.ParamChecker;
052import org.apache.oozie.util.XCallable;
053import org.apache.oozie.util.XConfiguration;
054import org.apache.oozie.util.XLog;
055import org.apache.oozie.service.XLogStreamingService;
056
057import java.io.StringReader;
058import java.io.Writer;
059import java.util.Date;
060import java.util.List;
061import java.util.Properties;
062import java.util.Set;
063import java.util.HashSet;
064import java.util.StringTokenizer;
065import java.util.Map;
066import java.util.HashMap;
067import java.util.ArrayList;
068import java.io.IOException;
069
070/**
071 * The DagEngine provides all the DAG engine functionality for WS calls.
072 */
073public class DagEngine extends BaseEngine {
074
075    private static final int HIGH_PRIORITY = 2;
076    private static XLog LOG = XLog.getLog(DagEngine.class);
077
078    /**
079     * Create a system Dag engine, with no user and no group.
080     */
081    public DagEngine() {
082        if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
083            LOG.debug("Oozie DagEngine is not using XCommands.");
084        }
085        else {
086            LOG.debug("Oozie DagEngine is using XCommands.");
087        }
088    }
089
090    /**
091     * Create a Dag engine to perform operations on behave of a user.
092     *
093     * @param user user name.
094     */
095    public DagEngine(String user) {
096        this();
097
098        this.user = ParamChecker.notEmpty(user, "user");
099    }
100
101    /**
102     * Submit a workflow job. <p/> It validates configuration properties.
103     *
104     * @param conf job configuration.
105     * @param startJob indicates if the job should be started or not.
106     * @return the job Id.
107     * @throws DagEngineException thrown if the job could not be created.
108     */
109    @Override
110    public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
111        validateSubmitConfiguration(conf);
112
113        try {
114            String jobId;
115            SubmitXCommand submit = new SubmitXCommand(conf);
116            jobId = submit.call();
117            if (startJob) {
118                start(jobId);
119            }
120            return jobId;
121        }
122        catch (CommandException ex) {
123            throw new DagEngineException(ex);
124        }
125    }
126
127    /**
128     * Submit a workflow through a coordinator. It validates configuration properties.
129     * @param conf job conf
130     * @param parentId parent of workflow
131     * @return
132     * @throws DagEngineException
133     */
134    public String submitJobFromCoordinator(Configuration conf, String parentId) throws DagEngineException {
135        validateSubmitConfiguration(conf);
136        try {
137            String jobId;
138            SubmitXCommand submit = new SubmitXCommand(conf, parentId);
139            jobId = submit.call();
140            start(jobId);
141            return jobId;
142        }
143        catch (CommandException ex) {
144            throw new DagEngineException(ex);
145        }
146    }
147
148    /**
149     * Submit a pig/hive/mapreduce job through HTTP.
150     * <p/>
151     * It validates configuration properties.
152     *
153     * @param conf job configuration.
154     * @param jobType job type - can be "pig", "hive", "sqoop" or "mapreduce".
155     * @return the job Id.
156     * @throws DagEngineException thrown if the job could not be created.
157     */
158    public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException {
159        validateSubmitConfiguration(conf);
160
161        try {
162            String jobId;
163            SubmitHttpXCommand submit = null;
164            if (jobType.equals("pig")) {
165                submit = new SubmitPigXCommand(conf);
166            }
167            else if (jobType.equals("mapreduce")) {
168                submit = new SubmitMRXCommand(conf);
169            }
170            else if (jobType.equals("hive")) {
171                submit = new SubmitHiveXCommand(conf);
172            }
173            else if (jobType.equals("sqoop")) {
174                submit = new SubmitSqoopXCommand(conf);
175            }
176
177            jobId = submit.call();
178            start(jobId);
179            return jobId;
180        }
181        catch (CommandException ex) {
182            throw new DagEngineException(ex);
183        }
184    }
185
186    private void validateSubmitConfiguration(Configuration conf) throws DagEngineException {
187        if (conf.get(OozieClient.APP_PATH) == null) {
188            throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
189        }
190    }
191
192    /**
193     * Start a job.
194     *
195     * @param jobId job Id.
196     * @throws DagEngineException thrown if the job could not be started.
197     */
198    @Override
199    public void start(String jobId) throws DagEngineException {
200        // Changing to synchronous call from asynchronous queuing to prevent the
201        // loss of command if the queue is full or the queue is lost in case of
202        // failure.
203        try {
204                new StartXCommand(jobId).call();
205        }
206        catch (CommandException e) {
207            throw new DagEngineException(e);
208        }
209    }
210
211    /**
212     * Resume a job.
213     *
214     * @param jobId job Id.
215     * @throws DagEngineException thrown if the job could not be resumed.
216     */
217    @Override
218    public void resume(String jobId) throws DagEngineException {
219        // Changing to synchronous call from asynchronous queuing to prevent the
220        // loss of command if the queue is full or the queue is lost in case of
221        // failure.
222        try {
223                new ResumeXCommand(jobId).call();
224        }
225        catch (CommandException e) {
226            throw new DagEngineException(e);
227        }
228    }
229
230    /**
231     * Suspend a job.
232     *
233     * @param jobId job Id.
234     * @throws DagEngineException thrown if the job could not be suspended.
235     */
236    @Override
237    public void suspend(String jobId) throws DagEngineException {
238        // Changing to synchronous call from asynchronous queuing to prevent the
239        // loss of command if the queue is full or the queue is lost in case of
240        // failure.
241        try {
242                        new SuspendXCommand(jobId).call();
243        }
244        catch (CommandException e) {
245            throw new DagEngineException(e);
246        }
247    }
248
249    /**
250     * Kill a job.
251     *
252     * @param jobId job Id.
253     * @throws DagEngineException thrown if the job could not be killed.
254     */
255    @Override
256    public void kill(String jobId) throws DagEngineException {
257        // Changing to synchronous call from asynchronous queuing to prevent the
258        // loss of command if the queue is full or the queue is lost in case of
259        // failure.
260        try {
261                        new KillXCommand(jobId).call();
262                        LOG.info("User " + user + " killed the WF job " + jobId);
263        }
264        catch (CommandException e) {
265            throw new DagEngineException(e);
266        }
267    }
268
269    /* (non-Javadoc)
270     * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
271     */
272    @Override
273    public void change(String jobId, String changeValue) throws DagEngineException {
274        // This code should not be reached.
275        throw new DagEngineException(ErrorCode.E1017);
276    }
277
278    /**
279     * Rerun a job.
280     *
281     * @param jobId job Id to rerun.
282     * @param conf configuration information for the rerun.
283     * @throws DagEngineException thrown if the job could not be rerun.
284     */
285    @Override
286    public void reRun(String jobId, Configuration conf) throws DagEngineException {
287        try {
288            WorkflowJobBean wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
289            Configuration wfConf = new XConfiguration(new StringReader(wfBean.getConf()));
290            XConfiguration.copy(conf, wfConf);
291            validateReRunConfiguration(wfConf);
292            new ReRunXCommand(jobId, wfConf).call();
293        }
294        catch (CommandException ex) {
295            throw new DagEngineException(ex);
296        }
297        catch (JPAExecutorException ex) {
298            throw new DagEngineException(ex);
299        }
300        catch (IOException ex) {
301            throw new DagEngineException(ErrorCode.E0803, ex.getMessage());
302        }
303    }
304
305    private void validateReRunConfiguration(Configuration conf) throws DagEngineException {
306        if (conf.get(OozieClient.APP_PATH) == null) {
307            throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
308        }
309        if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) {
310            throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR "
311                    + OozieClient.RERUN_FAIL_NODES);
312        }
313        if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) {
314            throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR "
315                    + OozieClient.RERUN_FAIL_NODES);
316        }
317    }
318
319    /**
320     * Process an action callback.
321     *
322     * @param actionId the action Id.
323     * @param externalStatus the action external status.
324     * @param actionData the action output data, <code>null</code> if none.
325     * @throws DagEngineException thrown if the callback could not be processed.
326     */
327    public void processCallback(String actionId, String externalStatus, Properties actionData)
328            throws DagEngineException {
329        XLog.Info.get().clearParameter(XLogService.GROUP);
330        XLog.Info.get().clearParameter(XLogService.USER);
331        XCallable<Void> command = null;
332
333                command = new CompletedActionXCommand(actionId, externalStatus,
334                                actionData, HIGH_PRIORITY);
335        if (!Services.get().get(CallableQueueService.class).queue(command)) {
336            LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
337        }
338    }
339
340    /**
341     * Return the info about a job.
342     *
343     * @param jobId job Id.
344     * @return the workflow job info.
345     * @throws DagEngineException thrown if the job info could not be obtained.
346     */
347    @Override
348    public WorkflowJob getJob(String jobId) throws DagEngineException {
349        try {
350                return new JobXCommand(jobId).call();
351        }
352        catch (CommandException ex) {
353            throw new DagEngineException(ex);
354        }
355    }
356
357    /**
358     * Return the info about a job with actions subset.
359     *
360     * @param jobId job Id
361     * @param start starting from this index in the list of actions belonging to the job
362     * @param length number of actions to be returned
363     * @return the workflow job info.
364     * @throws DagEngineException thrown if the job info could not be obtained.
365     */
366    @Override
367    public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException {
368        try {
369                        return new JobXCommand(jobId, start, length).call();
370        }
371        catch (CommandException ex) {
372            throw new DagEngineException(ex);
373        }
374    }
375
376    /**
377     * Return the a job definition.
378     *
379     * @param jobId job Id.
380     * @return the job definition.
381     * @throws DagEngineException thrown if the job definition could no be obtained.
382     */
383    @Override
384    public String getDefinition(String jobId) throws DagEngineException {
385        try {
386                        return new DefinitionXCommand(jobId).call();
387        }
388        catch (CommandException ex) {
389            throw new DagEngineException(ex);
390        }
391    }
392
393    /**
394     * Stream the log of a job.
395     *
396     * @param jobId job Id.
397     * @param writer writer to stream the log to.
398     * @param params additional parameters from the request
399     * @throws IOException thrown if the log cannot be streamed.
400     * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
401     */
402    @Override
403    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
404            DagEngineException {
405        try {
406            XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
407            filter.setParameter(DagXLogInfoService.JOB, jobId);
408            WorkflowJob job = getJob(jobId);
409            Date lastTime = job.getEndTime();
410            if (lastTime == null) {
411                lastTime = job.getLastModifiedTime();
412            }
413            Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
414        }
415        catch (Exception e) {
416            throw new IOException(e);
417        }
418    }
419
420    private static final Set<String> FILTER_NAMES = new HashSet<String>();
421
422    static {
423        FILTER_NAMES.add(OozieClient.FILTER_USER);
424        FILTER_NAMES.add(OozieClient.FILTER_NAME);
425        FILTER_NAMES.add(OozieClient.FILTER_GROUP);
426        FILTER_NAMES.add(OozieClient.FILTER_STATUS);
427        FILTER_NAMES.add(OozieClient.FILTER_ID);
428    }
429
430    /**
431     * Validate a jobs filter.
432     *
433     * @param filter filter to validate.
434     * @return the parsed filter.
435     * @throws DagEngineException thrown if the filter is invalid.
436     */
437    protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException {
438        Map<String, List<String>> map = new HashMap<String, List<String>>();
439        if (filter != null) {
440            StringTokenizer st = new StringTokenizer(filter, ";");
441            while (st.hasMoreTokens()) {
442                String token = st.nextToken();
443                if (token.contains("=")) {
444                    String[] pair = token.split("=");
445                    if (pair.length != 2) {
446                        throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
447                    }
448                    if (!FILTER_NAMES.contains(pair[0])) {
449                        throw new DagEngineException(ErrorCode.E0420, filter, XLog
450                                .format("invalid name [{0}]", pair[0]));
451                    }
452                    if (pair[0].equals("status")) {
453                        try {
454                            WorkflowJob.Status.valueOf(pair[1]);
455                        }
456                        catch (IllegalArgumentException ex) {
457                            throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]",
458                                                                                              pair[1]));
459                        }
460                    }
461                    List<String> list = map.get(pair[0]);
462                    if (list == null) {
463                        list = new ArrayList<String>();
464                        map.put(pair[0], list);
465                    }
466                    list.add(pair[1]);
467                }
468                else {
469                    throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
470                }
471            }
472        }
473        return map;
474    }
475
476    /**
477     * Return the info about a set of jobs.
478     *
479     * @param filter job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax.
480     * @param start offset, base 1.
481     * @param len number of jobs to return.
482     * @return job info for all matching jobs, the jobs don't contain node action information.
483     * @throws DagEngineException thrown if the jobs info could not be obtained.
484     */
485    public WorkflowsInfo getJobs(String filter, int start, int len) throws DagEngineException {
486        Map<String, List<String>> filterList = parseFilter(filter);
487        try {
488                        return new JobsXCommand(filterList, start, len).call();
489        }
490        catch (CommandException dce) {
491            throw new DagEngineException(dce);
492        }
493    }
494
495    /**
496     * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
497     *
498     * @param externalId external ID provided at job submission time.
499     * @return the associated workflow job ID if any, <code>null</code> if none.
500     * @throws DagEngineException thrown if the lookup could not be done.
501     */
502    @Override
503    public String getJobIdForExternalId(String externalId) throws DagEngineException {
504        try {
505                        return new ExternalIdXCommand(externalId).call();
506        }
507        catch (CommandException dce) {
508            throw new DagEngineException(dce);
509        }
510    }
511
512    @Override
513    public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException {
514        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
515    }
516
517    @Override
518    public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
519            throws BaseEngineException {
520        throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
521    }
522
523    public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException {
524        try {
525                        return new WorkflowActionInfoXCommand(actionId).call();
526        }
527        catch (CommandException ex) {
528            throw new BaseEngineException(ex);
529        }
530    }
531
532    /* (non-Javadoc)
533     * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
534     */
535    @Override
536    public String dryRunSubmit(Configuration conf) throws BaseEngineException {
537        try {
538            SubmitXCommand submit = new SubmitXCommand(true, conf);
539            return submit.call();
540        } catch (CommandException ex) {
541            throw new DagEngineException(ex);
542        }
543    }
544}