This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.oozie.BundleActionBean;
029import org.apache.oozie.BundleJobBean;
030import org.apache.oozie.CoordinatorActionBean;
031import org.apache.oozie.CoordinatorJobBean;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.WorkflowActionBean;
034import org.apache.oozie.client.Job;
035import org.apache.oozie.client.OozieClient;
036import org.apache.oozie.command.CommandException;
037import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
038import org.apache.oozie.command.coord.CoordActionReadyXCommand;
039import org.apache.oozie.command.coord.CoordActionStartXCommand;
040import org.apache.oozie.command.coord.CoordKillXCommand;
041import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
042import org.apache.oozie.command.coord.CoordResumeXCommand;
043import org.apache.oozie.command.coord.CoordSubmitXCommand;
044import org.apache.oozie.command.coord.CoordSuspendXCommand;
045import org.apache.oozie.command.wf.ActionEndXCommand;
046import org.apache.oozie.command.wf.ActionStartXCommand;
047import org.apache.oozie.command.wf.KillXCommand;
048import org.apache.oozie.command.wf.ResumeXCommand;
049import org.apache.oozie.command.wf.SignalXCommand;
050import org.apache.oozie.command.wf.SuspendXCommand;
051import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
052import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
053import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
054import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
055import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
056import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
057import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
058import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
059import org.apache.oozie.executor.jpa.JPAExecutorException;
060import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
061import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
062import org.apache.oozie.util.ELUtils;
063import org.apache.oozie.util.JobUtils;
064import org.apache.oozie.util.XCallable;
065import org.apache.oozie.util.XConfiguration;
066import org.apache.oozie.util.XLog;
067import org.apache.oozie.util.XmlUtils;
068import org.jdom.Attribute;
069import org.jdom.Element;
070
071/**
072 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
073 * queues them for execution.
074 */
075public class RecoveryService implements Service {
076
077    public static final String RECOVERY_SERVICE_CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
078    public static final String CONF_PREFIX_WF_ACTIONS = RECOVERY_SERVICE_CONF_PREFIX + "wf.actions.";
079    public static final String CONF_PREFIX_COORD = RECOVERY_SERVICE_CONF_PREFIX + "coord.";
080    public static final String CONF_PREFIX_BUNDLE = RECOVERY_SERVICE_CONF_PREFIX + "bundle.";
081    /**
082     * Time interval, in seconds, at which the recovery service will be scheduled to run.
083     */
084    public static final String CONF_SERVICE_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "interval";
085    /**
086     * The number of callables to be queued in a batch.
087     */
088    public static final String CONF_CALLABLE_BATCH_SIZE = RECOVERY_SERVICE_CONF_PREFIX + "callable.batch.size";
089
090    /**
091     * Delay for the push missing dependencies in milliseconds.
092     */
093    public static final String CONF_PUSH_DEPENDENCY_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "push.dependency.interval";
094
095    /**
096     * Age of actions to queue, in seconds.
097     */
098    public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
099
100    public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval";
101
102    /**
103     * Age of coordinator jobs to recover, in seconds.
104     */
105    public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
106
107    /**
108     * Age of Bundle jobs to recover, in seconds.
109     */
110    public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
111
112    private static final String INSTRUMENTATION_GROUP = "recovery";
113    private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
114    private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
115    private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
116
117    public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000;
118
119
120
121    /**
122     * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
123     * queuing of commands.
124     */
125    static class RecoveryRunnable implements Runnable {
126        private final long olderThan;
127        private final long coordOlderThan;
128        private final long bundleOlderThan;
129        private long delay = 0;
130        private List<XCallable<?>> callables;
131        private List<XCallable<?>> delayedCallables;
132        private StringBuilder msg = null;
133        private JPAService jpaService = null;
134
135        public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
136            this.olderThan = olderThan;
137            this.coordOlderThan = coordOlderThan;
138            this.bundleOlderThan = bundleOlderThan;
139        }
140
141        public void run() {
142            XLog.Info.get().clear();
143            XLog log = XLog.getLog(getClass());
144            msg = new StringBuilder();
145            jpaService = Services.get().get(JPAService.class);
146            runWFRecovery();
147            runCoordActionRecovery();
148            runCoordActionRecoveryForReady();
149            runBundleRecovery();
150            log.debug("QUEUING [{0}] for potential recovery", msg.toString());
151            boolean ret = false;
152            if (null != callables) {
153                ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
154                if (ret == false) {
155                    log.warn("Unable to queue the callables commands for RecoveryService. "
156                            + "Most possibly command queue is full. Queue size is :"
157                            + Services.get().get(CallableQueueService.class).queueSize());
158                }
159                callables = null;
160            }
161            if (null != delayedCallables) {
162                ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
163                if (ret == false) {
164                    log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
165                            + "Most possibly Callable queue is full. Queue size is :"
166                            + Services.get().get(CallableQueueService.class).queueSize());
167                }
168                delayedCallables = null;
169                this.delay = 0;
170            }
171        }
172
173        private void runBundleRecovery(){
174            XLog.Info.get().clear();
175            XLog log = XLog.getLog(getClass());
176            List<BundleActionBean> bactions = null;
177            try {
178                bactions = BundleActionQueryExecutor.getInstance().getList(
179                        BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, bundleOlderThan);
180            }
181            catch (JPAExecutorException ex) {
182                log.warn("Error reading bundle actions from database", ex);
183                return;
184            }
185            msg.append(", BUNDLE_ACTIONS : " + bactions.size());
186            for (BundleActionBean baction : bactions) {
187                try {
188                    Services.get().get(InstrumentationService.class).get()
189                            .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
190                    if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) {
191                        log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
192                        continue;
193                    }
194                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) {
195                        if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) {
196                            BundleJobBean bundleJob = null;
197                            if (jpaService != null) {
198                                bundleJob = BundleJobQueryExecutor.getInstance().get(
199                                        BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId());
200                            }
201                            Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
202                            List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
203                            for (Element coordElem : coordElems) {
204                                Attribute name = coordElem.getAttribute("name");
205                                String coordName=name.getValue();
206                                Configuration coordConf = mergeConfig(coordElem, bundleJob);
207                                try {
208                                    coordName = ELUtils.resolveAppName(coordName, coordConf);
209                                }
210                                catch (Exception e) {
211                                    log.error("Error evaluating coord name " + e.getMessage(), e);
212                                    continue;
213                                }
214                                if (coordName.equals(baction.getCoordName())) {
215                                    coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
216                                    queueCallable(new CoordSubmitXCommand(coordConf,
217                                            bundleJob.getId(), coordName));
218                                }
219                            }
220                        }
221                        else if (baction.getStatus() == Job.Status.KILLED) {
222                            queueCallable(new CoordKillXCommand(baction.getCoordId()));
223                        }
224                        else if (baction.getStatus() == Job.Status.SUSPENDED
225                                || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
226                            queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
227                        }
228                        else if (baction.getStatus() == Job.Status.RUNNING
229                                || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
230                            queueCallable(new CoordResumeXCommand(baction.getCoordId()));
231                        }
232                    }
233                }
234                catch (Exception ex) {
235                    log.error("Exception, {0}", ex.getMessage(), ex);
236                }
237            }
238
239        }
240
241        /**
242         * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
243         */
244        private void runCoordActionRecovery() {
245            XLog.Info.get().clear();
246            XLog log = XLog.getLog(getClass());
247            long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL);
248            long pushMissingDepDelay = pushMissingDepInterval;
249            List<CoordinatorActionBean> cactions = null;
250            try {
251                cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
252            }
253            catch (JPAExecutorException ex) {
254                log.warn("Error reading coord actions from database", ex);
255                return;
256            }
257            msg.append(", COORD_ACTIONS : " + cactions.size());
258            for (CoordinatorActionBean caction : cactions) {
259                try {
260                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(caction.getId())) {
261                        Services.get().get(InstrumentationService.class).get()
262                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
263                        if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
264                            queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
265                            log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :"
266                                    + caction.getId());
267                            if (caction.getPushMissingDependencies() != null
268                                    && caction.getPushMissingDependencies().length() != 0) {
269                                queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
270                                        pushMissingDepDelay);
271                                pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
272                                log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :"
273                                        + caction.getId());
274                            }
275                        }
276                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
277                            CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
278                                    CoordJobQuery.GET_COORD_JOB_USER_APPNAME, caction.getJobId());
279                            queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
280                                    coordJob.getAppName(), caction.getJobId()));
281
282                            log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
283                                    + caction.getId());
284                        }
285                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
286                            if (caction.getExternalId() != null && caction.getPending() > 1) {
287                                queueCallable(new SuspendXCommand(caction.getExternalId()));
288                                log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :"
289                                        + caction.getId());
290                            }
291                        }
292                        else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
293                            if (caction.getExternalId() != null) {
294                                queueCallable(new KillXCommand(caction.getExternalId()));
295                                log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
296                            }
297                        }
298                        else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
299                            if (caction.getExternalId() != null) {
300                                queueCallable(new ResumeXCommand(caction.getExternalId()));
301                                log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
302                            }
303                        }
304                    }
305                }
306                catch (Exception ex) {
307                    log.error("Exception, {0}", ex.getMessage(), ex);
308                }
309            }
310
311
312        }
313
314        /**
315         * Recover coordinator actions that are staying in READY too long
316         */
317        private void runCoordActionRecoveryForReady() {
318            XLog.Info.get().clear();
319            XLog log = XLog.getLog(getClass());
320
321            try {
322                List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
323                jobids = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(jobids);
324                msg.append(", COORD_READY_JOBS : " + jobids.size());
325                for (String jobid : jobids) {
326                        queueCallable(new CoordActionReadyXCommand(jobid));
327
328                    log.info("Recover READY coord actions for jobid :" + jobid);
329                }
330            }
331            catch (Exception ex) {
332                log.error("Exception, {0}", ex.getMessage(), ex);
333            }
334        }
335
336        /**
337         * Recover wf actions
338         */
339        private void runWFRecovery() {
340            XLog.Info.get().clear();
341            XLog log = XLog.getLog(getClass());
342            // queue command for action recovery
343
344            long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL)
345                    * ONE_DAY_MILLISCONDS;
346
347            List<WorkflowActionBean> actions = null;
348            try {
349                actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS,
350                        olderThan, createdTimeInterval);
351            }
352            catch (JPAExecutorException ex) {
353                log.warn("Exception while reading pending actions from storage", ex);
354                return;
355            }
356            // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
357            // actions.size());
358            msg.append(" WF_ACTIONS " + actions.size());
359
360            for (WorkflowActionBean action : actions) {
361                try {
362                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(action.getId())) {
363                        Services.get().get(InstrumentationService.class).get()
364                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
365                        if (action.getStatus() == WorkflowActionBean.Status.PREP
366                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
367                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
368                        }
369                        else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
370                            Date nextRunTime = action.getPendingAge();
371                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
372                                    - System.currentTimeMillis());
373                        }
374                        else if (action.getStatus() == WorkflowActionBean.Status.DONE
375                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
376                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
377                        }
378                        else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
379                            Date nextRunTime = action.getPendingAge();
380                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
381                                    - System.currentTimeMillis());
382
383                        }
384                        else if (action.getStatus() == WorkflowActionBean.Status.OK
385                                || action.getStatus() == WorkflowActionBean.Status.ERROR) {
386                            queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
387                        }
388                        else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
389                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
390                        }
391                    }
392                }
393                catch (Exception ex) {
394                    log.error("Exception, {0}", ex.getMessage(), ex);
395                }
396            }
397
398        }
399
400        /**
401         * Adds callables to a list. If the number of callables in the list reaches {@link
402         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
403         *
404         * @param callable the callable to queue.
405         */
406        private void queueCallable(XCallable<?> callable) {
407            if (callables == null) {
408                callables = new ArrayList<XCallable<?>>();
409            }
410            callables.add(callable);
411            if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
412                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
413                if (ret == false) {
414                    XLog.getLog(getClass()).warn(
415                            "Unable to queue the callables commands for RecoveryService. "
416                                    + "Most possibly command queue is full. Queue size is :"
417                                    + Services.get().get(CallableQueueService.class).queueSize());
418                }
419                callables = new ArrayList<XCallable<?>>();
420            }
421        }
422
423        /**
424         * Adds callables to a list. If the number of callables in the list reaches {@link
425         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
426         * of the callables in the list. The callables list and the delay is reset.
427         *
428         * @param callable the callable to queue.
429         * @param delay the delay for the callable.
430         */
431        private void queueCallable(XCallable<?> callable, long delay) {
432            if (delayedCallables == null) {
433                delayedCallables = new ArrayList<XCallable<?>>();
434            }
435            this.delay = Math.max(this.delay, delay);
436            delayedCallables.add(callable);
437            if (delayedCallables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)){
438                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
439                if (ret == false) {
440                    XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
441                            + "Most possibly Callable queue is full. Queue size is :"
442                            + Services.get().get(CallableQueueService.class).queueSize());
443                }
444                delayedCallables = new ArrayList<XCallable<?>>();
445                this.delay = 0;
446            }
447        }
448    }
449
450    /**
451     * Initializes the RecoveryService.
452     *
453     * @param services services instance.
454     */
455    @Override
456    public void init(Services services) {
457        Configuration conf = services.getConf();
458        Runnable recoveryRunnable = new RecoveryRunnable(
459                ConfigurationService.getInt(conf, CONF_WF_ACTIONS_OLDER_THAN),
460                ConfigurationService.getInt(conf, CONF_COORD_OLDER_THAN),
461                ConfigurationService.getInt(conf, CONF_BUNDLE_OLDER_THAN));
462        services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf),
463                                                      SchedulerService.Unit.SEC);
464    }
465
466    public int getRecoveryServiceInterval(Configuration conf){
467        return ConfigurationService.getInt(conf, CONF_SERVICE_INTERVAL);
468    }
469
470    /**
471     * Destroy the Recovery Service.
472     */
473    @Override
474    public void destroy() {
475    }
476
477    /**
478     * Return the public interface for the Recovery Service.
479     *
480     * @return {@link RecoveryService}.
481     */
482    @Override
483    public Class<? extends Service> getInterface() {
484        return RecoveryService.class;
485    }
486
487    /**
488     * Merge Bundle job config and the configuration from the coord job to pass
489     * to Coord Engine
490     *
491     * @param coordElem the coordinator configuration
492     * @return Configuration merged configuration
493     * @throws CommandException thrown if failed to merge configuration
494     */
495    private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
496        XLog.Info.get().clear();
497        XLog log = XLog.getLog("RecoveryService");
498
499        String jobConf = bundleJob.getConf();
500        // Step 1: runConf = jobConf
501        Configuration runConf = null;
502        try {
503            runConf = new XConfiguration(new StringReader(jobConf));
504        }
505        catch (IOException e1) {
506            log.warn("Configuration parse error in:" + jobConf);
507            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
508        }
509        // Step 2: Merge local properties into runConf
510        // extract 'property' tags under 'configuration' block in the coordElem
511        // convert Element to XConfiguration
512        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
513
514        if (localConfigElement != null) {
515            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
516            Configuration localConf;
517            try {
518                localConf = new XConfiguration(new StringReader(strConfig));
519            }
520            catch (IOException e1) {
521                log.warn("Configuration parse error in:" + strConfig);
522                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
523            }
524
525            // copy configuration properties in the coordElem to the runConf
526            XConfiguration.copy(localConf, runConf);
527        }
528
529        // Step 3: Extract value of 'app-path' in coordElem, save it as a
530        // new property called 'oozie.coord.application.path', and normalize.
531        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
532        runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
533        // Normalize coordinator appPath here;
534        try {
535            JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
536        }
537        catch (IOException e) {
538            throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
539        }
540        return runConf;
541    }
542}