001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.sql.Timestamp;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Set;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.BundleActionBean;
032import org.apache.oozie.BundleJobBean;
033import org.apache.oozie.CoordinatorActionBean;
034import org.apache.oozie.CoordinatorJobBean;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.WorkflowActionBean;
037import org.apache.oozie.client.Job;
038import org.apache.oozie.client.OozieClient;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.command.bundle.BundleCoordSubmitXCommand;
041import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
042import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
043import org.apache.oozie.command.coord.CoordActionReadyXCommand;
044import org.apache.oozie.command.coord.CoordActionStartXCommand;
045import org.apache.oozie.command.coord.CoordKillXCommand;
046import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
047import org.apache.oozie.command.coord.CoordResumeXCommand;
048import org.apache.oozie.command.coord.CoordSuspendXCommand;
049import org.apache.oozie.command.wf.ActionEndXCommand;
050import org.apache.oozie.command.wf.ActionStartXCommand;
051import org.apache.oozie.command.wf.KillXCommand;
052import org.apache.oozie.command.wf.ResumeXCommand;
053import org.apache.oozie.command.wf.SignalXCommand;
054import org.apache.oozie.command.wf.SuspendXCommand;
055import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
056import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
057import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
058import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
059import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
060import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
061import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
062import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
063import org.apache.oozie.executor.jpa.JPAExecutorException;
064import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
065import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
066import org.apache.oozie.util.ELUtils;
067import org.apache.oozie.util.JobUtils;
068import org.apache.oozie.util.XCallable;
069import org.apache.oozie.util.XConfiguration;
070import org.apache.oozie.util.XLog;
071import org.apache.oozie.util.XmlUtils;
072import org.jdom.Attribute;
073import org.jdom.Element;
074
075/**
076 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
077 * queues them for execution.
078 */
079public class RecoveryService implements Service {
080
081    public static final String RECOVERY_SERVICE_CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
082    public static final String CONF_PREFIX_WF_ACTIONS = RECOVERY_SERVICE_CONF_PREFIX + "wf.actions.";
083    public static final String CONF_PREFIX_COORD = RECOVERY_SERVICE_CONF_PREFIX + "coord.";
084    public static final String CONF_PREFIX_BUNDLE = RECOVERY_SERVICE_CONF_PREFIX + "bundle.";
085    /**
086     * Time interval, in seconds, at which the recovery service will be scheduled to run.
087     */
088    public static final String CONF_SERVICE_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "interval";
089    /**
090     * The number of callables to be queued in a batch.
091     */
092    public static final String CONF_CALLABLE_BATCH_SIZE = RECOVERY_SERVICE_CONF_PREFIX + "callable.batch.size";
093
094    /**
095     * Delay for the push missing dependencies in milliseconds.
096     */
097    public static final String CONF_PUSH_DEPENDENCY_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "push.dependency.interval";
098
099    /**
100     * Age of actions to queue, in seconds.
101     */
102    public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
103
104    public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval";
105
106    /**
107     * Age of coordinator jobs to recover, in seconds.
108     */
109    public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
110
111    /**
112     * Age of Bundle jobs to recover, in seconds.
113     */
114    public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
115
116    private static final String INSTRUMENTATION_GROUP = "recovery";
117    private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
118    private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
119    private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
120
121    public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000;
122
123
124
125    /**
126     * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
127     * queuing of commands.
128     */
129    static class RecoveryRunnable implements Runnable {
130        private final long olderThan;
131        private final long coordOlderThan;
132        private final long bundleOlderThan;
133        private long delay = 0;
134        private List<XCallable<?>> callables;
135        private List<XCallable<?>> delayedCallables;
136        private StringBuilder msg = null;
137        private JPAService jpaService = null;
138
139        public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
140            this.olderThan = olderThan;
141            this.coordOlderThan = coordOlderThan;
142            this.bundleOlderThan = bundleOlderThan;
143        }
144
145        public void run() {
146            XLog.Info.get().clear();
147            XLog log = XLog.getLog(getClass());
148            msg = new StringBuilder();
149            jpaService = Services.get().get(JPAService.class);
150            runWFRecovery();
151            runCoordActionRecovery();
152            runBundleRecovery();
153            log.debug("QUEUED [{0}] for potential recovery", msg.toString());
154            boolean ret = false;
155            if (null != callables) {
156                ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
157                if (ret == false) {
158                    log.warn("Unable to queue the callables commands for RecoveryService. "
159                            + "Most possibly command queue is full. Queue size is :"
160                            + Services.get().get(CallableQueueService.class).queueSize());
161                }
162                callables = null;
163            }
164            if (null != delayedCallables) {
165                ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
166                if (ret == false) {
167                    log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
168                            + "Most possibly Callable queue is full. Queue size is :"
169                            + Services.get().get(CallableQueueService.class).queueSize());
170                }
171                delayedCallables = null;
172                this.delay = 0;
173            }
174        }
175
176        private void runBundleRecovery(){
177            XLog.Info.get().clear();
178            XLog log = XLog.getLog(getClass());
179            List<BundleActionBean> bactions = null;
180            try {
181                bactions = BundleActionQueryExecutor.getInstance().getList(
182                        BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, bundleOlderThan);
183            }
184            catch (JPAExecutorException ex) {
185                log.warn("Error reading bundle actions from database", ex);
186                return;
187            }
188            msg.append(", BUNDLE_ACTIONS : ").append(bactions.size());
189            for (BundleActionBean baction : bactions) {
190                try {
191                    Services.get().get(InstrumentationService.class).get()
192                            .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
193                    if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) {
194                        log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
195                        continue;
196                    }
197                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) {
198                        if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) {
199
200                            CoordinatorJobBean coordJobs = CoordJobQueryExecutor.getInstance().getIfExist(
201                                    CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, baction.getCoordName(),
202                                    baction.getBundleId());
203
204                            if (coordJobs == null) {
205                                log.debug("Coord [{0}] for bundle [{1}] is not yet submitted , submitting new one",
206                                        baction.getCoordName(), baction.getBundleId());
207
208                                BundleJobBean bundleJob = null;
209                                if (jpaService != null) {
210                                    bundleJob = BundleJobQueryExecutor.getInstance().get(
211                                            BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId());
212                                }
213                                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
214                                @SuppressWarnings("unchecked")
215                                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
216                                for (Element coordElem : coordElems) {
217                                    Attribute name = coordElem.getAttribute("name");
218                                    String coordName = name.getValue();
219                                    Configuration coordConf = mergeConfig(coordElem, bundleJob);
220                                    try {
221                                        coordName = ELUtils.resolveAppName(coordName, coordConf);
222                                    }
223                                    catch (Exception e) {
224                                        log.error("Error evaluating coord name " + e.getMessage(), e);
225                                        continue;
226                                    }
227                                    if (coordName.equals(baction.getCoordName())) {
228                                        coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
229                                        queueCallable(new BundleCoordSubmitXCommand(coordConf, bundleJob.getId(),
230                                                coordName));
231                                    }
232                                }
233                            }
234                            else {
235                                log.debug(
236                                        "Coord [{0}] for bundle [{1}] is submitted , but bundle action is not updated.",
237                                        baction.getCoordName(), baction.getBundleId());
238                                coordJobs = CoordJobQueryExecutor.getInstance().getIfExist(
239                                        CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, baction.getCoordName(),
240                                        coordJobs.getId());
241                                queueCallable(new BundleStatusUpdateXCommand(coordJobs, baction.getStatus()));
242                            }
243                        }
244                        else if (baction.getStatus() == Job.Status.KILLED) {
245                            queueCallable(new CoordKillXCommand(baction.getCoordId()));
246                        }
247                        else if (baction.getStatus() == Job.Status.SUSPENDED
248                                || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
249                            queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
250                        }
251                        else if (baction.getStatus() == Job.Status.RUNNING
252                                || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
253                            queueCallable(new CoordResumeXCommand(baction.getCoordId()));
254                        }
255                    }
256                }
257                catch (Exception ex) {
258                    log.error("Exception, {0}", ex.getMessage(), ex);
259                }
260            }
261
262        }
263
264        /**
265         * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
266         */
267        private void runCoordActionRecovery() {
268            Set<String> readyJobs = new HashSet<String>();
269            XLog.Info.get().clear();
270            XLog log = XLog.getLog(getClass());
271            long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL);
272            long pushMissingDepDelay = pushMissingDepInterval;
273            Timestamp ts = new Timestamp(System.currentTimeMillis() - this.coordOlderThan * 1000);
274
275            List<CoordinatorActionBean> cactions = new ArrayList<CoordinatorActionBean>();
276            try {
277                cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
278                        CoordActionQuery.GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, ts));
279                cactions.addAll(CoordActionQueryExecutor.getInstance().getList(
280                        CoordActionQuery.GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, ts));
281
282            }
283            catch (JPAExecutorException ex) {
284                log.warn("Error reading coord actions from database", ex);
285                return;
286            }
287            msg.append(", COORD_ACTIONS : " + cactions.size());
288            for (CoordinatorActionBean caction : cactions) {
289                try {
290                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(caction.getId())) {
291                        Services.get().get(InstrumentationService.class).get()
292                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
293                        if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
294                            queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
295                            log.debug("Recover a coord action from [WAITING] and resubmit CoordActionInputCheckXCommand :[{0}]"
296                                    , caction.getId());
297                            if (caction.getPushMissingDependencies() != null
298                                    && caction.getPushMissingDependencies().length() != 0) {
299                                queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
300                                        pushMissingDepDelay);
301                                pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
302                                log.debug("Recover a coord action from [WAITING] and resubmit CoordPushDependencyCheckX :[{0}]"
303                                        , caction.getId());
304                            }
305                        }
306                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
307                            CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
308                                    CoordJobQuery.GET_COORD_JOB_USER_APPNAME, caction.getJobId());
309                            queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
310                                    coordJob.getAppName(), caction.getJobId()));
311                            log.debug("Recover a coord action from [SUBMITTED] and resubmit CoordActionStartCommand :[{0}]",
312                                    caction.getId());
313                        }
314                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
315                            if (caction.getExternalId() != null && caction.getPending() > 1) {
316                                queueCallable(new SuspendXCommand(caction.getExternalId()));
317                                log.debug("Recover a coord action from [SUSPENDED] and resubmit SuspendXCommand :[{0}]"
318                                        , caction.getId());
319                            }
320                        }
321                        else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
322                            if (caction.getExternalId() != null) {
323                                queueCallable(new KillXCommand(caction.getExternalId()));
324                                log.debug("Recover a coord action from [KILLED] and resubmit KillXCommand :[{0}]"
325                                        , caction.getId());
326                            }
327                        }
328                        else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
329                            if (caction.getExternalId() != null) {
330                                queueCallable(new ResumeXCommand(caction.getExternalId()));
331                                log.debug("Recover a coord action from [RUNNING] and resubmit ResumeXCommand :[{0}]"
332                                        , caction.getId());
333                            }
334                        }
335                        else if (caction.getStatus() == CoordinatorActionBean.Status.READY) {
336                            readyJobs.add(caction.getJobId());
337                        }
338                    }
339                }
340                catch (Exception ex) {
341                    log.error("Exception, {0}", ex.getMessage(), ex);
342                }
343            }
344            runCoordActionRecoveryForReady(readyJobs);
345        }
346
347        /**
348         * Recover coordinator actions that are staying in READY too long
349         */
350        private void runCoordActionRecoveryForReady(Set<String> jobIds) {
351            XLog.Info.get().clear();
352            XLog log = XLog.getLog(getClass());
353            List<String> coordJobIds = new ArrayList<String>(jobIds);
354            try {
355                coordJobIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(coordJobIds);
356                msg.append(", COORD_READY_JOBS : " + coordJobIds.size());
357                for (String jobid : coordJobIds) {
358                    queueCallable(new CoordActionReadyXCommand(jobid));
359                    log.debug("Recover a coord action from [READY] resubmit CoordActionReadyXCommand :[{0}]", jobid);
360                }
361            }
362            catch (Exception ex) {
363                log.error("Exception, {0}", ex.getMessage(), ex);
364            }
365        }
366
367        /**
368         * Recover wf actions
369         */
370        private void runWFRecovery() {
371            XLog.Info.get().clear();
372            XLog log = XLog.getLog(getClass());
373            // queue command for action recovery
374
375            long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL)
376                    * ONE_DAY_MILLISCONDS;
377
378            List<WorkflowActionBean> actions = null;
379            try {
380                actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS,
381                        olderThan, createdTimeInterval);
382            }
383            catch (JPAExecutorException ex) {
384                log.warn("Exception while reading pending actions from storage", ex);
385                return;
386            }
387            // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
388            // actions.size());
389            msg.append(" WF_ACTIONS " + actions.size());
390
391            for (WorkflowActionBean action : actions) {
392                try {
393                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(action.getId())) {
394                        Services.get().get(InstrumentationService.class).get()
395                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
396                        if (action.getStatus() == WorkflowActionBean.Status.PREP
397                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
398                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
399                            log.debug("Recover a workflow action from [{0}] status and resubmit ActionStartXCommand :[{1}]",
400                                    action.getStatus(), action.getId());
401                        }
402                        else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
403                            Date nextRunTime = action.getPendingAge();
404                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
405                                    - System.currentTimeMillis());
406                            log.debug("Recover a workflow action from [START_RETRY] status and resubmit ActionStartXCommand :[{0}]"
407                                    , action.getId());
408                        }
409                        else if (action.getStatus() == WorkflowActionBean.Status.DONE
410                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
411                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
412                            log.debug("Recover a workflow action from [{0}] status and resubmit ActionEndXCommand :[{1}]",
413                                    action.getStatus(), action.getId());
414                        }
415                        else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
416                            Date nextRunTime = action.getPendingAge();
417                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
418                                    - System.currentTimeMillis());
419                            log.debug("Recover a workflow action from [END_RETRY] status and resubmit ActionEndXCommand :[{0}]",
420                                    action.getId());
421                        }
422                        else if (action.getStatus() == WorkflowActionBean.Status.OK
423                                || action.getStatus() == WorkflowActionBean.Status.ERROR) {
424                            queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
425                            log.debug("Recover a workflow action from [{0}] status and resubmit SignalXCommand :[{1}]",
426                                    action.getStatus(), action.getId());
427                        }
428                        else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
429                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
430                            log.debug("Recover a workflow action from [USER_RETRY] status and resubmit ActionStartXCommand :[{0}]"
431                                    , action.getId());
432                        }
433                    }
434                }
435                catch (Exception ex) {
436                    log.error("Exception, {0}", ex.getMessage(), ex);
437                }
438            }
439
440        }
441
442        /**
443         * Adds callables to a list. If the number of callables in the list reaches {@link
444         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
445         *
446         * @param callable the callable to queue.
447         */
448        private void queueCallable(XCallable<?> callable) {
449            if (callables == null) {
450                callables = new ArrayList<XCallable<?>>();
451            }
452            callables.add(callable);
453            if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
454                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
455                if (ret == false) {
456                    XLog.getLog(getClass()).warn(
457                            "Unable to queue the callables commands for RecoveryService. "
458                                    + "Most possibly command queue is full. Queue size is :"
459                                    + Services.get().get(CallableQueueService.class).queueSize());
460                }
461                callables = new ArrayList<XCallable<?>>();
462            }
463        }
464
465        /**
466         * Adds callables to a list. If the number of callables in the list reaches {@link
467         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
468         * of the callables in the list. The callables list and the delay is reset.
469         *
470         * @param callable the callable to queue.
471         * @param delay the delay for the callable.
472         */
473        private void queueCallable(XCallable<?> callable, long delay) {
474            if (delayedCallables == null) {
475                delayedCallables = new ArrayList<XCallable<?>>();
476            }
477            this.delay = Math.max(this.delay, delay);
478            delayedCallables.add(callable);
479            if (delayedCallables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)){
480                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
481                if (ret == false) {
482                    XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
483                            + "Most possibly Callable queue is full. Queue size is :"
484                            + Services.get().get(CallableQueueService.class).queueSize());
485                }
486                delayedCallables = new ArrayList<XCallable<?>>();
487                this.delay = 0;
488            }
489        }
490    }
491
492    /**
493     * Initializes the RecoveryService.
494     *
495     * @param services services instance.
496     */
497    @Override
498    public void init(Services services) {
499        Configuration conf = services.getConf();
500        Runnable recoveryRunnable = new RecoveryRunnable(
501                ConfigurationService.getInt(conf, CONF_WF_ACTIONS_OLDER_THAN),
502                ConfigurationService.getInt(conf, CONF_COORD_OLDER_THAN),
503                ConfigurationService.getInt(conf, CONF_BUNDLE_OLDER_THAN));
504        services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf),
505                                                      SchedulerService.Unit.SEC);
506    }
507
508    public int getRecoveryServiceInterval(Configuration conf){
509        return ConfigurationService.getInt(conf, CONF_SERVICE_INTERVAL);
510    }
511
512    /**
513     * Destroy the Recovery Service.
514     */
515    @Override
516    public void destroy() {
517    }
518
519    /**
520     * Return the public interface for the Recovery Service.
521     *
522     * @return {@link RecoveryService}.
523     */
524    @Override
525    public Class<? extends Service> getInterface() {
526        return RecoveryService.class;
527    }
528
529    /**
530     * Merge Bundle job config and the configuration from the coord job to pass
531     * to Coord Engine
532     *
533     * @param coordElem the coordinator configuration
534     * @return Configuration merged configuration
535     * @throws CommandException thrown if failed to merge configuration
536     */
537    private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
538        XLog.Info.get().clear();
539        XLog log = XLog.getLog("RecoveryService");
540
541        String jobConf = bundleJob.getConf();
542        // Step 1: runConf = jobConf
543        Configuration runConf = null;
544        try {
545            runConf = new XConfiguration(new StringReader(jobConf));
546        }
547        catch (IOException e1) {
548            log.warn("Configuration parse error in:" + jobConf);
549            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
550        }
551        // Step 2: Merge local properties into runConf
552        // extract 'property' tags under 'configuration' block in the coordElem
553        // convert Element to XConfiguration
554        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
555
556        if (localConfigElement != null) {
557            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
558            Configuration localConf;
559            try {
560                localConf = new XConfiguration(new StringReader(strConfig));
561            }
562            catch (IOException e1) {
563                log.warn("Configuration parse error in:" + strConfig);
564                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
565            }
566
567            // copy configuration properties in the coordElem to the runConf
568            XConfiguration.copy(localConf, runConf);
569        }
570
571        // Step 3: Extract value of 'app-path' in coordElem, save it as a
572        // new property called 'oozie.coord.application.path', and normalize.
573        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
574        runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
575        // Normalize coordinator appPath here;
576        try {
577            JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
578        }
579        catch (IOException e) {
580            throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
581        }
582        return runConf;
583    }
584}