001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import org.apache.oozie.util.XLogStreamer;
021    import org.apache.oozie.service.XLogService;
022    import org.apache.oozie.service.DagXLogInfoService;
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.oozie.client.CoordinatorJob;
025    import org.apache.oozie.client.WorkflowJob;
026    import org.apache.oozie.client.OozieClient;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.command.wf.CompletedActionXCommand;
029    import org.apache.oozie.command.wf.DefinitionXCommand;
030    import org.apache.oozie.command.wf.ExternalIdXCommand;
031    import org.apache.oozie.command.wf.JobXCommand;
032    import org.apache.oozie.command.wf.JobsXCommand;
033    import org.apache.oozie.command.wf.KillXCommand;
034    import org.apache.oozie.command.wf.ReRunXCommand;
035    import org.apache.oozie.command.wf.ResumeXCommand;
036    import org.apache.oozie.command.wf.StartXCommand;
037    import org.apache.oozie.command.wf.SubmitHttpXCommand;
038    import org.apache.oozie.command.wf.SubmitMRXCommand;
039    import org.apache.oozie.command.wf.SubmitPigXCommand;
040    import org.apache.oozie.command.wf.SubmitXCommand;
041    import org.apache.oozie.command.wf.SuspendXCommand;
042    import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
043    import org.apache.oozie.service.Services;
044    import org.apache.oozie.service.CallableQueueService;
045    import org.apache.oozie.util.ParamChecker;
046    import org.apache.oozie.util.XCallable;
047    import org.apache.oozie.util.XLog;
048    
049    import java.io.Writer;
050    import java.util.Date;
051    import java.util.List;
052    import java.util.Properties;
053    import java.util.Set;
054    import java.util.HashSet;
055    import java.util.StringTokenizer;
056    import java.util.Map;
057    import java.util.HashMap;
058    import java.util.ArrayList;
059    import java.io.IOException;
060    
061    /**
062     * The DagEngine provides all the DAG engine functionality for WS calls.
063     */
064    public class DagEngine extends BaseEngine {
065    
066        private static final int HIGH_PRIORITY = 2;
067        private static XLog LOG = XLog.getLog(DagEngine.class);
068    
069        /**
070         * Create a system Dag engine, with no user and no group.
071         */
072        public DagEngine() {
073            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
074                LOG.debug("Oozie DagEngine is not using XCommands.");
075            }
076            else {
077                LOG.debug("Oozie DagEngine is using XCommands.");
078            }
079        }
080    
081        /**
082         * Create a Dag engine to perform operations on behave of a user.
083         *
084         * @param user user name.
085         * @param authToken the authentication token.
086         */
087        public DagEngine(String user, String authToken) {
088            this();
089    
090            this.user = ParamChecker.notEmpty(user, "user");
091            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
092        }
093    
094        /**
095         * Submit a workflow job. <p/> It validates configuration properties.
096         *
097         * @param conf job configuration.
098         * @param startJob indicates if the job should be started or not.
099         * @return the job Id.
100         * @throws DagEngineException thrown if the job could not be created.
101         */
102        @Override
103        public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
104            validateSubmitConfiguration(conf);
105    
106            try {
107                            String jobId;
108                            SubmitXCommand submit = new SubmitXCommand(conf, getAuthToken());
109                            jobId = submit.call();
110                            if (startJob) {
111                                    start(jobId);
112                            }
113                            return jobId;
114            }
115            catch (CommandException ex) {
116                throw new DagEngineException(ex);
117            }
118        }
119    
120        /**
121         * Submit a pig/mapreduce job through HTTP.
122         * <p/>
123         * It validates configuration properties.
124         *
125         * @param conf job configuration.
126         * @param jobType job type - can be "pig" or "mapreduce".
127         * @return the job Id.
128         * @throws DagEngineException thrown if the job could not be created.
129         */
130        public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException {
131            validateSubmitConfiguration(conf);
132    
133            try {
134                String jobId;
135                SubmitHttpXCommand submit = null;
136                if (jobType.equals("pig")) {
137                    submit = new SubmitPigXCommand(conf, getAuthToken());
138                }
139                else if (jobType.equals("mapreduce")) {
140                    submit = new SubmitMRXCommand(conf, getAuthToken());
141                }
142    
143                jobId = submit.call();
144                start(jobId);
145                return jobId;
146            }
147            catch (CommandException ex) {
148                throw new DagEngineException(ex);
149            }
150        }
151    
152        private void validateSubmitConfiguration(Configuration conf) throws DagEngineException {
153            if (conf.get(OozieClient.APP_PATH) == null) {
154                throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
155            }
156        }
157    
158        /**
159         * Start a job.
160         *
161         * @param jobId job Id.
162         * @throws DagEngineException thrown if the job could not be started.
163         */
164        @Override
165        public void start(String jobId) throws DagEngineException {
166            // Changing to synchronous call from asynchronous queuing to prevent the
167            // loss of command if the queue is full or the queue is lost in case of
168            // failure.
169            try {
170                    new StartXCommand(jobId).call();
171            }
172            catch (CommandException e) {
173                throw new DagEngineException(e);
174            }
175        }
176    
177        /**
178         * Resume a job.
179         *
180         * @param jobId job Id.
181         * @throws DagEngineException thrown if the job could not be resumed.
182         */
183        @Override
184        public void resume(String jobId) throws DagEngineException {
185            // Changing to synchronous call from asynchronous queuing to prevent the
186            // loss of command if the queue is full or the queue is lost in case of
187            // failure.
188            try {
189                    new ResumeXCommand(jobId).call();
190            }
191            catch (CommandException e) {
192                throw new DagEngineException(e);
193            }
194        }
195    
196        /**
197         * Suspend a job.
198         *
199         * @param jobId job Id.
200         * @throws DagEngineException thrown if the job could not be suspended.
201         */
202        @Override
203        public void suspend(String jobId) throws DagEngineException {
204            // Changing to synchronous call from asynchronous queuing to prevent the
205            // loss of command if the queue is full or the queue is lost in case of
206            // failure.
207            try {
208                            new SuspendXCommand(jobId).call();
209            }
210            catch (CommandException e) {
211                throw new DagEngineException(e);
212            }
213        }
214    
215        /**
216         * Kill a job.
217         *
218         * @param jobId job Id.
219         * @throws DagEngineException thrown if the job could not be killed.
220         */
221        @Override
222        public void kill(String jobId) throws DagEngineException {
223            // Changing to synchronous call from asynchronous queuing to prevent the
224            // loss of command if the queue is full or the queue is lost in case of
225            // failure.
226            try {
227                            new KillXCommand(jobId).call();
228                            LOG.info("User " + user + " killed the WF job " + jobId);
229            }
230            catch (CommandException e) {
231                throw new DagEngineException(e);
232            }
233        }
234    
235        /* (non-Javadoc)
236         * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
237         */
238        @Override
239        public void change(String jobId, String changeValue) throws DagEngineException {
240            // This code should not be reached.
241            throw new DagEngineException(ErrorCode.E1017);
242        }
243    
244        /**
245         * Rerun a job.
246         *
247         * @param jobId job Id to rerun.
248         * @param conf configuration information for the rerun.
249         * @throws DagEngineException thrown if the job could not be rerun.
250         */
251        @Override
252        public void reRun(String jobId, Configuration conf) throws DagEngineException {
253            try {
254                validateReRunConfiguration(conf);
255                            new ReRunXCommand(jobId, conf, getAuthToken()).call();
256                start(jobId);
257            }
258            catch (CommandException ex) {
259                throw new DagEngineException(ex);
260            }
261        }
262    
263        private void validateReRunConfiguration(Configuration conf) throws DagEngineException {
264            if (conf.get(OozieClient.APP_PATH) == null) {
265                throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
266            }
267            if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) {
268                throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR "
269                        + OozieClient.RERUN_FAIL_NODES);
270            }
271            if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) {
272                throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR "
273                        + OozieClient.RERUN_FAIL_NODES);
274            }
275        }
276    
277        /**
278         * Process an action callback.
279         *
280         * @param actionId the action Id.
281         * @param externalStatus the action external status.
282         * @param actionData the action output data, <code>null</code> if none.
283         * @throws DagEngineException thrown if the callback could not be processed.
284         */
285        public void processCallback(String actionId, String externalStatus, Properties actionData)
286                throws DagEngineException {
287            XLog.Info.get().clearParameter(XLogService.GROUP);
288            XLog.Info.get().clearParameter(XLogService.USER);
289            XCallable<Void> command = null;
290    
291                    command = new CompletedActionXCommand(actionId, externalStatus,
292                                    actionData, HIGH_PRIORITY);
293            if (!Services.get().get(CallableQueueService.class).queue(command)) {
294                LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
295            }
296        }
297    
298        /**
299         * Return the info about a job.
300         *
301         * @param jobId job Id.
302         * @return the workflow job info.
303         * @throws DagEngineException thrown if the job info could not be obtained.
304         */
305        @Override
306        public WorkflowJob getJob(String jobId) throws DagEngineException {
307            try {
308                    return new JobXCommand(jobId).call();
309            }
310            catch (CommandException ex) {
311                throw new DagEngineException(ex);
312            }
313        }
314    
315        /**
316         * Return the info about a job with actions subset.
317         *
318         * @param jobId job Id
319         * @param start starting from this index in the list of actions belonging to the job
320         * @param length number of actions to be returned
321         * @return the workflow job info.
322         * @throws DagEngineException thrown if the job info could not be obtained.
323         */
324        @Override
325        public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException {
326            try {
327                            return new JobXCommand(jobId, start, length).call();
328            }
329            catch (CommandException ex) {
330                throw new DagEngineException(ex);
331            }
332        }
333    
334        /**
335         * Return the a job definition.
336         *
337         * @param jobId job Id.
338         * @return the job definition.
339         * @throws DagEngineException thrown if the job definition could no be obtained.
340         */
341        @Override
342        public String getDefinition(String jobId) throws DagEngineException {
343            try {
344                            return new DefinitionXCommand(jobId).call();
345            }
346            catch (CommandException ex) {
347                throw new DagEngineException(ex);
348            }
349        }
350    
351        /**
352         * Stream the log of a job.
353         *
354         * @param jobId job Id.
355         * @param writer writer to stream the log to.
356         * @throws IOException thrown if the log cannot be streamed.
357         * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
358         */
359        @Override
360        public void streamLog(String jobId, Writer writer) throws IOException, DagEngineException {
361            XLogStreamer.Filter filter = new XLogStreamer.Filter();
362            filter.setParameter(DagXLogInfoService.JOB, jobId);
363            WorkflowJob job = getJob(jobId);
364            Date lastTime = job.getEndTime();
365            if (lastTime == null) {
366                lastTime = job.getLastModifiedTime();
367            }
368            Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer);
369        }
370    
371        private static final Set<String> FILTER_NAMES = new HashSet<String>();
372    
373        static {
374            FILTER_NAMES.add(OozieClient.FILTER_USER);
375            FILTER_NAMES.add(OozieClient.FILTER_NAME);
376            FILTER_NAMES.add(OozieClient.FILTER_GROUP);
377            FILTER_NAMES.add(OozieClient.FILTER_STATUS);
378            FILTER_NAMES.add(OozieClient.FILTER_ID);
379        }
380    
381        /**
382         * Validate a jobs filter.
383         *
384         * @param filter filter to validate.
385         * @return the parsed filter.
386         * @throws DagEngineException thrown if the filter is invalid.
387         */
388        protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException {
389            Map<String, List<String>> map = new HashMap<String, List<String>>();
390            if (filter != null) {
391                StringTokenizer st = new StringTokenizer(filter, ";");
392                while (st.hasMoreTokens()) {
393                    String token = st.nextToken();
394                    if (token.contains("=")) {
395                        String[] pair = token.split("=");
396                        if (pair.length != 2) {
397                            throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
398                        }
399                        if (!FILTER_NAMES.contains(pair[0])) {
400                            throw new DagEngineException(ErrorCode.E0420, filter, XLog
401                                    .format("invalid name [{0}]", pair[0]));
402                        }
403                        if (pair[0].equals("status")) {
404                            try {
405                                WorkflowJob.Status.valueOf(pair[1]);
406                            }
407                            catch (IllegalArgumentException ex) {
408                                throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]",
409                                                                                                  pair[1]));
410                            }
411                        }
412                        List<String> list = map.get(pair[0]);
413                        if (list == null) {
414                            list = new ArrayList<String>();
415                            map.put(pair[0], list);
416                        }
417                        list.add(pair[1]);
418                    }
419                    else {
420                        throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
421                    }
422                }
423            }
424            return map;
425        }
426    
427        /**
428         * Return the info about a set of jobs.
429         *
430         * @param filter job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax.
431         * @param start offset, base 1.
432         * @param len number of jobs to return.
433         * @return job info for all matching jobs, the jobs don't contain node action information.
434         * @throws DagEngineException thrown if the jobs info could not be obtained.
435         */
436        public WorkflowsInfo getJobs(String filter, int start, int len) throws DagEngineException {
437            Map<String, List<String>> filterList = parseFilter(filter);
438            try {
439                            return new JobsXCommand(filterList, start, len).call();
440            }
441            catch (CommandException dce) {
442                throw new DagEngineException(dce);
443            }
444        }
445    
446        /**
447         * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
448         *
449         * @param externalId external ID provided at job submission time.
450         * @return the associated workflow job ID if any, <code>null</code> if none.
451         * @throws DagEngineException thrown if the lookup could not be done.
452         */
453        @Override
454        public String getJobIdForExternalId(String externalId) throws DagEngineException {
455            try {
456                            return new ExternalIdXCommand(externalId).call();
457            }
458            catch (CommandException dce) {
459                throw new DagEngineException(dce);
460            }
461        }
462    
463        @Override
464        public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException {
465            throw new BaseEngineException(new XException(ErrorCode.E0301));
466        }
467    
468        @Override
469        public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException {
470            throw new BaseEngineException(new XException(ErrorCode.E0301));
471        }
472    
473        public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException {
474            try {
475                            return new WorkflowActionInfoXCommand(actionId).call();
476            }
477            catch (CommandException ex) {
478                throw new BaseEngineException(ex);
479            }
480        }
481    
482        @Override
483        public String dryrunSubmit(Configuration conf, boolean startJob) throws BaseEngineException {
484            return null;
485        }
486    }