This project has retired. For details please refer to its Attic page.
001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import org.apache.oozie.util.XLogStreamer;
021    import org.apache.oozie.service.XLogService;
022    import org.apache.oozie.service.DagXLogInfoService;
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.client.CoordinatorJob;
025    import org.apache.oozie.client.WorkflowJob;
026    import org.apache.oozie.client.OozieClient;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.command.XCommand;
029    import org.apache.oozie.command.wf.CompletedActionCommand;
030    import org.apache.oozie.command.wf.CompletedActionXCommand;
031    import org.apache.oozie.command.wf.DefinitionCommand;
032    import org.apache.oozie.command.wf.DefinitionXCommand;
033    import org.apache.oozie.command.wf.ExternalIdCommand;
034    import org.apache.oozie.command.wf.ExternalIdXCommand;
035    import org.apache.oozie.command.wf.JobCommand;
036    import org.apache.oozie.command.wf.JobXCommand;
037    import org.apache.oozie.command.wf.JobsCommand;
038    import org.apache.oozie.command.wf.JobsXCommand;
039    import org.apache.oozie.command.wf.KillCommand;
040    import org.apache.oozie.command.wf.KillXCommand;
041    import org.apache.oozie.command.wf.ReRunCommand;
042    import org.apache.oozie.command.wf.ReRunXCommand;
043    import org.apache.oozie.command.wf.ResumeCommand;
044    import org.apache.oozie.command.wf.ResumeXCommand;
045    import org.apache.oozie.command.wf.StartCommand;
046    import org.apache.oozie.command.wf.StartXCommand;
047    import org.apache.oozie.command.wf.SubmitCommand;
048    import org.apache.oozie.command.wf.SubmitHttpCommand;
049    import org.apache.oozie.command.wf.SubmitHttpXCommand;
050    import org.apache.oozie.command.wf.SubmitMRCommand;
051    import org.apache.oozie.command.wf.SubmitMRXCommand;
052    import org.apache.oozie.command.wf.SubmitPigCommand;
053    import org.apache.oozie.command.wf.SubmitPigXCommand;
054    import org.apache.oozie.command.wf.SubmitXCommand;
055    import org.apache.oozie.command.wf.SuspendCommand;
056    import org.apache.oozie.command.wf.SuspendXCommand;
057    import org.apache.oozie.command.wf.WorkflowActionInfoCommand;
058    import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
059    import org.apache.oozie.service.Services;
060    import org.apache.oozie.service.CallableQueueService;
061    import org.apache.oozie.util.ParamChecker;
062    import org.apache.oozie.util.XCallable;
063    import org.apache.oozie.util.XLog;
064    
065    import java.io.Writer;
066    import java.util.Date;
067    import java.util.List;
068    import java.util.Properties;
069    import java.util.Set;
070    import java.util.HashSet;
071    import java.util.StringTokenizer;
072    import java.util.Map;
073    import java.util.HashMap;
074    import java.util.ArrayList;
075    import java.io.IOException;
076    
077    /**
078     * The DagEngine provides all the DAG engine functionality for WS calls.
079     */
080    public class DagEngine extends BaseEngine {
081    
082        private static final int HIGH_PRIORITY = 2;
083        private boolean useXCommand = true;
084        private static XLog LOG = XLog.getLog(DagEngine.class);
085    
086        /**
087         * Create a system Dag engine, with no user and no group.
088         */
089        public DagEngine() {
090            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
091                useXCommand = false;
092                LOG.debug("Oozie DagEngine is not using XCommands.");
093            }
094            else {
095                LOG.debug("Oozie DagEngine is using XCommands.");
096            }
097        }
098    
099        /**
100         * Create a Dag engine to perform operations on behave of a user.
101         *
102         * @param user user name.
103         * @param authToken the authentication token.
104         */
105        public DagEngine(String user, String authToken) {
106            this();
107            
108            this.user = ParamChecker.notEmpty(user, "user");
109            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
110        }
111    
112        /**
113         * Submit a workflow job. <p/> It validates configuration properties.
114         *
115         * @param conf job configuration.
116         * @param startJob indicates if the job should be started or not.
117         * @return the job Id.
118         * @throws DagEngineException thrown if the job could not be created.
119         */
120        @Override
121        public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
122            validateSubmitConfiguration(conf);
123            
124            try {
125                String jobId;
126                if (useXCommand) { 
127                    SubmitXCommand submit = new SubmitXCommand(conf, getAuthToken());
128                    jobId = submit.call();
129                }
130                else {
131                    SubmitCommand submit = new SubmitCommand(conf, getAuthToken());
132                    jobId = submit.call();
133                }
134                if (startJob) {
135                    start(jobId);
136                }
137                return jobId;
138            }
139            catch (CommandException ex) {
140                throw new DagEngineException(ex);
141            }
142        }
143    
144        /**
145         * Submit a pig/mapreduce job through HTTP.
146         * <p/>
147         * It validates configuration properties.
148         *
149         * @param conf job configuration.
150         * @param jobType job type - can be "pig" or "mapreduce".
151         * @return the job Id.
152         * @throws DagEngineException thrown if the job could not be created.
153         */
154        public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException {
155            validateSubmitConfiguration(conf);
156    
157            try {
158                String jobId;
159                if (useXCommand) {
160                    SubmitHttpXCommand submit = null;
161                    if (jobType.equals("pig")) {
162                        submit = new SubmitPigXCommand(conf, getAuthToken());
163                    }
164                    else if (jobType.equals("mapreduce")) {
165                        submit = new SubmitMRXCommand(conf, getAuthToken());
166                    }
167    
168                    jobId = submit.call();
169                }
170                else {
171                    SubmitHttpCommand submit = null;
172                    if (jobType.equals("pig")) {
173                        submit = new SubmitPigCommand(conf, getAuthToken());
174                    }
175                    else if (jobType.equals("mapreduce")) {
176                        submit = new SubmitMRCommand(conf, getAuthToken());
177                    }
178    
179                    jobId = submit.call(); 
180                }
181                start(jobId);
182                return jobId;
183            }
184            catch (CommandException ex) {
185                throw new DagEngineException(ex);
186            }
187        }
188    
189        private void validateSubmitConfiguration(Configuration conf) throws DagEngineException {
190            if (conf.get(OozieClient.APP_PATH) == null) {
191                throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
192            }
193        }
194    
195        /**
196         * Start a job.
197         *
198         * @param jobId job Id.
199         * @throws DagEngineException thrown if the job could not be started.
200         */
201        @Override
202        public void start(String jobId) throws DagEngineException {
203            // Changing to synchronous call from asynchronous queuing to prevent the
204            // loss of command if the queue is full or the queue is lost in case of
205            // failure.
206            try {
207                if (useXCommand) {
208                    new StartXCommand(jobId).call();
209                }
210                else {
211                    new StartCommand(jobId).call();
212                }
213            }
214            catch (CommandException e) {
215                throw new DagEngineException(e);
216            }
217        }
218    
219        /**
220         * Resume a job.
221         *
222         * @param jobId job Id.
223         * @throws DagEngineException thrown if the job could not be resumed.
224         */
225        @Override
226        public void resume(String jobId) throws DagEngineException {
227            // Changing to synchronous call from asynchronous queuing to prevent the
228            // loss of command if the queue is full or the queue is lost in case of
229            // failure.
230            try {
231                if (useXCommand) {
232                    new ResumeXCommand(jobId).call();
233                }
234                else {
235                    new ResumeCommand(jobId).call();
236                }
237            }
238            catch (CommandException e) {
239                throw new DagEngineException(e);
240            }
241        }
242    
243        /**
244         * Suspend a job.
245         *
246         * @param jobId job Id.
247         * @throws DagEngineException thrown if the job could not be suspended.
248         */
249        @Override
250        public void suspend(String jobId) throws DagEngineException {
251            // Changing to synchronous call from asynchronous queuing to prevent the
252            // loss of command if the queue is full or the queue is lost in case of
253            // failure.
254            try {
255                if (useXCommand) {
256                    new SuspendXCommand(jobId).call();
257                }
258                else {
259                    new SuspendCommand(jobId).call();
260                }
261            }
262            catch (CommandException e) {
263                throw new DagEngineException(e);
264            }
265        }
266    
267        /**
268         * Kill a job.
269         *
270         * @param jobId job Id.
271         * @throws DagEngineException thrown if the job could not be killed.
272         */
273        @Override
274        public void kill(String jobId) throws DagEngineException {
275            // Changing to synchronous call from asynchronous queuing to prevent the
276            // loss of command if the queue is full or the queue is lost in case of
277            // failure.
278            try {
279                if (useXCommand) {
280                    new KillXCommand(jobId).call();
281                }
282                else {
283                    new KillCommand(jobId).call();
284                }
285                LOG.info("User " + user + " killed the WF job " + jobId);
286            }
287            catch (CommandException e) {
288                throw new DagEngineException(e);
289            }
290        }
291    
292        /* (non-Javadoc)
293         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
294         */
295        @Override
296        public void change(String jobId, String changeValue) throws DagEngineException {
297            // This code should not be reached.
298            throw new DagEngineException(ErrorCode.E1017);
299        }
300    
301        /**
302         * Rerun a job.
303         *
304         * @param jobId job Id to rerun.
305         * @param conf configuration information for the rerun.
306         * @throws DagEngineException thrown if the job could not be rerun.
307         */
308        @Override
309        public void reRun(String jobId, Configuration conf) throws DagEngineException {
310            try {
311                validateReRunConfiguration(conf);
312                
313                if (useXCommand) {
314                    new ReRunXCommand(jobId, conf, getAuthToken()).call();
315                }
316                else {
317                    new ReRunCommand(jobId, conf, getAuthToken()).call();
318                }
319                start(jobId);
320            }
321            catch (CommandException ex) {
322                throw new DagEngineException(ex);
323            }
324        }
325    
326        private void validateReRunConfiguration(Configuration conf) throws DagEngineException {
327            if (conf.get(OozieClient.APP_PATH) == null) {
328                throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
329            }
330            if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) {
331                throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR "
332                        + OozieClient.RERUN_FAIL_NODES);
333            }
334            if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) {
335                throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR "
336                        + OozieClient.RERUN_FAIL_NODES);
337            }
338        }
339    
340        /**
341         * Process an action callback.
342         *
343         * @param actionId the action Id.
344         * @param externalStatus the action external status.
345         * @param actionData the action output data, <code>null</code> if none.
346         * @throws DagEngineException thrown if the callback could not be processed.
347         */
348        public void processCallback(String actionId, String externalStatus, Properties actionData)
349                throws DagEngineException {
350            XLog.Info.get().clearParameter(XLogService.GROUP);
351            XLog.Info.get().clearParameter(XLogService.USER);
352            XCallable<Void> command = null;
353    
354            if (useXCommand) {
355                command = new CompletedActionXCommand(actionId, externalStatus, actionData, HIGH_PRIORITY);
356            }
357            else {
358                command = new CompletedActionCommand(actionId, externalStatus, actionData, HIGH_PRIORITY);
359            }
360            if (!Services.get().get(CallableQueueService.class).queue(command)) {
361                LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
362            }
363        }
364    
365        /**
366         * Return the info about a job.
367         *
368         * @param jobId job Id.
369         * @return the workflow job info.
370         * @throws DagEngineException thrown if the job info could not be obtained.
371         */
372        @Override
373        public WorkflowJob getJob(String jobId) throws DagEngineException {
374            try {
375                if (useXCommand) {
376                    return new JobXCommand(jobId).call();
377                }
378                else {
379                    return new JobCommand(jobId).call();
380                }
381            }
382            catch (CommandException ex) {
383                throw new DagEngineException(ex);
384            }
385        }
386    
387        /**
388         * Return the info about a job with actions subset.
389         *
390         * @param jobId job Id
391         * @param start starting from this index in the list of actions belonging to the job
392         * @param length number of actions to be returned
393         * @return the workflow job info.
394         * @throws DagEngineException thrown if the job info could not be obtained.
395         */
396        @Override
397        public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException {
398            try {
399                if (useXCommand) {
400                    return new JobXCommand(jobId, start, length).call();
401                }
402                else {
403                    return new JobCommand(jobId, start, length).call();
404                }
405            }
406            catch (CommandException ex) {
407                throw new DagEngineException(ex);
408            }
409        }
410    
411        /**
412         * Return the a job definition.
413         *
414         * @param jobId job Id.
415         * @return the job definition.
416         * @throws DagEngineException thrown if the job definition could no be obtained.
417         */
418        @Override
419        public String getDefinition(String jobId) throws DagEngineException {
420            try {
421                if (useXCommand) {
422                    return new DefinitionXCommand(jobId).call();
423                }
424                else {
425                    return new DefinitionCommand(jobId).call();
426                }
427            }
428            catch (CommandException ex) {
429                throw new DagEngineException(ex);
430            }
431        }
432    
433        /**
434         * Stream the log of a job.
435         *
436         * @param jobId job Id.
437         * @param writer writer to stream the log to.
438         * @throws IOException thrown if the log cannot be streamed.
439         * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
440         */
441        @Override
442        public void streamLog(String jobId, Writer writer) throws IOException, DagEngineException {
443            XLogStreamer.Filter filter = new XLogStreamer.Filter();
444            filter.setParameter(DagXLogInfoService.JOB, jobId);
445            WorkflowJob job = getJob(jobId);
446            Date lastTime = job.getEndTime();
447            if (lastTime == null) {
448                lastTime = job.getLastModifiedTime();
449            }
450            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer);
451        }
452    
453        private static final Set<String> FILTER_NAMES = new HashSet<String>();
454    
455        static {
456            FILTER_NAMES.add(OozieClient.FILTER_USER);
457            FILTER_NAMES.add(OozieClient.FILTER_NAME);
458            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
459            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
460            FILTER_NAMES.add(OozieClient.FILTER_ID);
461        }
462    
463        /**
464         * Validate a jobs filter.
465         *
466         * @param filter filter to validate.
467         * @return the parsed filter.
468         * @throws DagEngineException thrown if the filter is invalid.
469         */
470        protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException {
471            Map<String, List<String>> map = new HashMap<String, List<String>>();
472            if (filter != null) {
473                StringTokenizer st = new StringTokenizer(filter, ";");
474                while (st.hasMoreTokens()) {
475                    String token = st.nextToken();
476                    if (token.contains("=")) {
477                        String[] pair = token.split("=");
478                        if (pair.length != 2) {
479                            throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
480                        }
481                        if (!FILTER_NAMES.contains(pair[0])) {
482                            throw new DagEngineException(ErrorCode.E0420, filter, XLog
483                                    .format("invalid name [{0}]", pair[0]));
484                        }
485                        if (pair[0].equals("status")) {
486                            try {
487                                WorkflowJob.Status.valueOf(pair[1]);
488                            }
489                            catch (IllegalArgumentException ex) {
490                                throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]",
491                                                                                                  pair[1]));
492                            }
493                        }
494                        List<String> list = map.get(pair[0]);
495                        if (list == null) {
496                            list = new ArrayList<String>();
497                            map.put(pair[0], list);
498                        }
499                        list.add(pair[1]);
500                    }
501                    else {
502                        throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
503                    }
504                }
505            }
506            return map;
507        }
508    
509        /**
510         * Return the info about a set of jobs.
511         *
512         * @param filterStr job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax.
513         * @param start offset, base 1.
514         * @param len number of jobs to return.
515         * @return job info for all matching jobs, the jobs don't contain node action information.
516         * @throws DagEngineException thrown if the jobs info could not be obtained.
517         */
518        public WorkflowsInfo getJobs(String filterStr, int start, int len) throws DagEngineException {
519            Map<String, List<String>> filter = parseFilter(filterStr);
520            try {
521                if (useXCommand) {
522                    return new JobsXCommand(filter, start, len).call();
523                }
524                else {
525                    return new JobsCommand(filter, start, len).call();
526                }
527            }
528            catch (CommandException dce) {
529                throw new DagEngineException(dce);
530            }
531        }
532    
533        /**
534         * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
535         *
536         * @param externalId external ID provided at job submission time.
537         * @return the associated workflow job ID if any, <code>null</code> if none.
538         * @throws DagEngineException thrown if the lookup could not be done.
539         */
540        @Override
541        public String getJobIdForExternalId(String externalId) throws DagEngineException {
542            try {
543                if (useXCommand) {
544                    return new ExternalIdXCommand(externalId).call();
545                }
546                else {
547                    return new ExternalIdCommand(externalId).call();
548                }
549            }
550            catch (CommandException dce) {
551                throw new DagEngineException(dce);
552            }
553        }
554    
555        @Override
556        public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException {
557            throw new BaseEngineException(new XException(ErrorCode.E0301));
558        }
559    
560        @Override
561        public CoordinatorJob getCoordJob(String jobId, int start, int length) throws BaseEngineException {
562            throw new BaseEngineException(new XException(ErrorCode.E0301));
563        }
564    
565        public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException {
566            try {
567                if (useXCommand) {
568                    return new WorkflowActionInfoXCommand(actionId).call();
569                }
570                else {
571                    return new WorkflowActionInfoCommand(actionId).call();
572                }
573            }
574            catch (CommandException ex) {
575                throw new BaseEngineException(ex);
576            }
577        }
578    
579        @Override
580        public String dryrunSubmit(Configuration conf, boolean startJob) throws BaseEngineException {
581            return null;
582        }
583    }