001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.BundleActionBean;
028import org.apache.oozie.BundleJobBean;
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.CoordinatorJobBean;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.WorkflowActionBean;
033import org.apache.oozie.client.Job;
034import org.apache.oozie.client.OozieClient;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
037import org.apache.oozie.command.coord.CoordActionReadyXCommand;
038import org.apache.oozie.command.coord.CoordActionStartXCommand;
039import org.apache.oozie.command.coord.CoordKillXCommand;
040import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
041import org.apache.oozie.command.coord.CoordResumeXCommand;
042import org.apache.oozie.command.coord.CoordSubmitXCommand;
043import org.apache.oozie.command.coord.CoordSuspendXCommand;
044import org.apache.oozie.command.wf.ActionEndXCommand;
045import org.apache.oozie.command.wf.ActionStartXCommand;
046import org.apache.oozie.command.wf.KillXCommand;
047import org.apache.oozie.command.wf.ResumeXCommand;
048import org.apache.oozie.command.wf.SignalXCommand;
049import org.apache.oozie.command.wf.SuspendXCommand;
050import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
051import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
052import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
053import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
054import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
055import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
056import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
057import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
058import org.apache.oozie.executor.jpa.JPAExecutorException;
059import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
060import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
061import org.apache.oozie.util.JobUtils;
062import org.apache.oozie.util.XCallable;
063import org.apache.oozie.util.XConfiguration;
064import org.apache.oozie.util.XLog;
065import org.apache.oozie.util.XmlUtils;
066import org.jdom.Attribute;
067import org.jdom.Element;
068
069/**
070 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
071 * queues them for execution.
072 */
073public class RecoveryService implements Service {
074
075    public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
076    public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
077    public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
078    public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
079    /**
080     * Time interval, in seconds, at which the recovery service will be scheduled to run.
081     */
082    public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
083    /**
084     * The number of callables to be queued in a batch.
085     */
086    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
087
088    /**
089     * Delay for the push missing dependencies in milliseconds.
090     */
091    public static final String CONF_PUSH_DEPENDENCY_INTERVAL = CONF_PREFIX + "push.dependency.interval";
092
093    /**
094     * Age of actions to queue, in seconds.
095     */
096    public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
097    /**
098     * Age of coordinator jobs to recover, in seconds.
099     */
100    public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
101
102    /**
103     * Age of Bundle jobs to recover, in seconds.
104     */
105    public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
106
107    private static final String INSTRUMENTATION_GROUP = "recovery";
108    private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
109    private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
110    private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
111
112
113    /**
114     * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
115     * queuing of commands.
116     */
117    static class RecoveryRunnable implements Runnable {
118        private final long olderThan;
119        private final long coordOlderThan;
120        private final long bundleOlderThan;
121        private long delay = 0;
122        private List<XCallable<?>> callables;
123        private List<XCallable<?>> delayedCallables;
124        private StringBuilder msg = null;
125        private JPAService jpaService = null;
126
127        public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
128            this.olderThan = olderThan;
129            this.coordOlderThan = coordOlderThan;
130            this.bundleOlderThan = bundleOlderThan;
131        }
132
133        public void run() {
134            XLog.Info.get().clear();
135            XLog log = XLog.getLog(getClass());
136            msg = new StringBuilder();
137            jpaService = Services.get().get(JPAService.class);
138            runWFRecovery();
139            runCoordActionRecovery();
140            runCoordActionRecoveryForReady();
141            runBundleRecovery();
142            log.debug("QUEUING [{0}] for potential recovery", msg.toString());
143            boolean ret = false;
144            if (null != callables) {
145                ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
146                if (ret == false) {
147                    log.warn("Unable to queue the callables commands for RecoveryService. "
148                            + "Most possibly command queue is full. Queue size is :"
149                            + Services.get().get(CallableQueueService.class).queueSize());
150                }
151                callables = null;
152            }
153            if (null != delayedCallables) {
154                ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
155                if (ret == false) {
156                    log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
157                            + "Most possibly Callable queue is full. Queue size is :"
158                            + Services.get().get(CallableQueueService.class).queueSize());
159                }
160                delayedCallables = null;
161                this.delay = 0;
162            }
163        }
164
165        private void runBundleRecovery(){
166            XLog.Info.get().clear();
167            XLog log = XLog.getLog(getClass());
168            List<BundleActionBean> bactions = null;
169            try {
170                bactions = BundleActionQueryExecutor.getInstance().getList(
171                        BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, bundleOlderThan);
172            }
173            catch (JPAExecutorException ex) {
174                log.warn("Error reading bundle actions from database", ex);
175                return;
176            }
177            msg.append(", BUNDLE_ACTIONS : " + bactions.size());
178            for (BundleActionBean baction : bactions) {
179                try {
180                    Services.get().get(InstrumentationService.class).get()
181                            .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
182                    if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) {
183                        log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
184                        continue;
185                    }
186                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) {
187                        if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) {
188                            BundleJobBean bundleJob = null;
189                            if (jpaService != null) {
190                                bundleJob = BundleJobQueryExecutor.getInstance().get(
191                                        BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId());
192                            }
193                            Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
194                            List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
195                            for (Element coordElem : coordElems) {
196                                Attribute name = coordElem.getAttribute("name");
197                                if (name.getValue().equals(baction.getCoordName())) {
198                                    Configuration coordConf = mergeConfig(coordElem, bundleJob);
199                                    coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
200                                    queueCallable(new CoordSubmitXCommand(coordConf,
201                                            bundleJob.getId(), name.getValue()));
202                                }
203                            }
204                        }
205                        else if (baction.getStatus() == Job.Status.KILLED) {
206                            queueCallable(new CoordKillXCommand(baction.getCoordId()));
207                        }
208                        else if (baction.getStatus() == Job.Status.SUSPENDED
209                                || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
210                            queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
211                        }
212                        else if (baction.getStatus() == Job.Status.RUNNING
213                                || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
214                            queueCallable(new CoordResumeXCommand(baction.getCoordId()));
215                        }
216                    }
217                }
218                catch (Exception ex) {
219                    log.error("Exception, {0}", ex.getMessage(), ex);
220                }
221            }
222
223        }
224
225        /**
226         * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
227         */
228        private void runCoordActionRecovery() {
229            XLog.Info.get().clear();
230            XLog log = XLog.getLog(getClass());
231            long pushMissingDepInterval = Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_INTERVAL, 200);
232            long pushMissingDepDelay = pushMissingDepInterval;
233            List<CoordinatorActionBean> cactions = null;
234            try {
235                cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
236            }
237            catch (JPAExecutorException ex) {
238                log.warn("Error reading coord actions from database", ex);
239                return;
240            }
241            msg.append(", COORD_ACTIONS : " + cactions.size());
242            for (CoordinatorActionBean caction : cactions) {
243                try {
244                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(caction.getId())) {
245                        Services.get().get(InstrumentationService.class).get()
246                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
247                        if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
248                            queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
249                            log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :"
250                                    + caction.getId());
251                            if (caction.getPushMissingDependencies() != null
252                                    && caction.getPushMissingDependencies().length() != 0) {
253                                queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
254                                        pushMissingDepDelay);
255                                pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
256                                log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :"
257                                        + caction.getId());
258                            }
259                        }
260                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
261                            CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
262                                    CoordJobQuery.GET_COORD_JOB_USER_APPNAME, caction.getJobId());
263                            queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
264                                    coordJob.getAppName(), caction.getJobId()));
265
266                            log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
267                                    + caction.getId());
268                        }
269                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
270                            if (caction.getExternalId() != null && caction.getPending() > 1) {
271                                queueCallable(new SuspendXCommand(caction.getExternalId()));
272                                log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :"
273                                        + caction.getId());
274                            }
275                        }
276                        else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
277                            if (caction.getExternalId() != null) {
278                                queueCallable(new KillXCommand(caction.getExternalId()));
279                                log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
280                            }
281                        }
282                        else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
283                            if (caction.getExternalId() != null) {
284                                queueCallable(new ResumeXCommand(caction.getExternalId()));
285                                log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
286                            }
287                        }
288                    }
289                }
290                catch (Exception ex) {
291                    log.error("Exception, {0}", ex.getMessage(), ex);
292                }
293            }
294
295
296        }
297
298        /**
299         * Recover coordinator actions that are staying in READY too long
300         */
301        private void runCoordActionRecoveryForReady() {
302            XLog.Info.get().clear();
303            XLog log = XLog.getLog(getClass());
304
305            try {
306                List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
307                jobids = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(jobids);
308                msg.append(", COORD_READY_JOBS : " + jobids.size());
309                for (String jobid : jobids) {
310                        queueCallable(new CoordActionReadyXCommand(jobid));
311
312                    log.info("Recover READY coord actions for jobid :" + jobid);
313                }
314            }
315            catch (Exception ex) {
316                log.error("Exception, {0}", ex.getMessage(), ex);
317            }
318        }
319
320        /**
321         * Recover wf actions
322         */
323        private void runWFRecovery() {
324            XLog.Info.get().clear();
325            XLog log = XLog.getLog(getClass());
326            // queue command for action recovery
327            List<WorkflowActionBean> actions = null;
328            try {
329                actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS,
330                        olderThan);
331            }
332            catch (JPAExecutorException ex) {
333                log.warn("Exception while reading pending actions from storage", ex);
334                return;
335            }
336            // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
337            // actions.size());
338            msg.append(" WF_ACTIONS " + actions.size());
339
340            for (WorkflowActionBean action : actions) {
341                try {
342                    if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(action.getId())) {
343                        Services.get().get(InstrumentationService.class).get()
344                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
345                        if (action.getStatus() == WorkflowActionBean.Status.PREP
346                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
347                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
348                        }
349                        else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
350                            Date nextRunTime = action.getPendingAge();
351                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
352                                    - System.currentTimeMillis());
353                        }
354                        else if (action.getStatus() == WorkflowActionBean.Status.DONE
355                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
356                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
357                        }
358                        else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
359                            Date nextRunTime = action.getPendingAge();
360                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
361                                    - System.currentTimeMillis());
362
363                        }
364                        else if (action.getStatus() == WorkflowActionBean.Status.OK
365                                || action.getStatus() == WorkflowActionBean.Status.ERROR) {
366                            queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
367                        }
368                        else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
369                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
370                        }
371                    }
372                }
373                catch (Exception ex) {
374                    log.error("Exception, {0}", ex.getMessage(), ex);
375                }
376            }
377
378        }
379
380        /**
381         * Adds callables to a list. If the number of callables in the list reaches {@link
382         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
383         *
384         * @param callable the callable to queue.
385         */
386        private void queueCallable(XCallable<?> callable) {
387            if (callables == null) {
388                callables = new ArrayList<XCallable<?>>();
389            }
390            callables.add(callable);
391            if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
392                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
393                if (ret == false) {
394                    XLog.getLog(getClass()).warn(
395                            "Unable to queue the callables commands for RecoveryService. "
396                                    + "Most possibly command queue is full. Queue size is :"
397                                    + Services.get().get(CallableQueueService.class).queueSize());
398                }
399                callables = new ArrayList<XCallable<?>>();
400            }
401        }
402
403        /**
404         * Adds callables to a list. If the number of callables in the list reaches {@link
405         * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
406         * of the callables in the list. The callables list and the delay is reset.
407         *
408         * @param callable the callable to queue.
409         * @param delay the delay for the callable.
410         */
411        private void queueCallable(XCallable<?> callable, long delay) {
412            if (delayedCallables == null) {
413                delayedCallables = new ArrayList<XCallable<?>>();
414            }
415            this.delay = Math.max(this.delay, delay);
416            delayedCallables.add(callable);
417            if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
418                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
419                if (ret == false) {
420                    XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
421                            + "Most possibly Callable queue is full. Queue size is :"
422                            + Services.get().get(CallableQueueService.class).queueSize());
423                }
424                delayedCallables = new ArrayList<XCallable<?>>();
425                this.delay = 0;
426            }
427        }
428    }
429
430    /**
431     * Initializes the RecoveryService.
432     *
433     * @param services services instance.
434     */
435    @Override
436    public void init(Services services) {
437        Configuration conf = services.getConf();
438        Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
439                CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
440        services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf),
441                                                      SchedulerService.Unit.SEC);
442    }
443
444    public int getRecoveryServiceInterval(Configuration conf){
445        return conf.getInt(CONF_SERVICE_INTERVAL, 60);
446    }
447
448    /**
449     * Destroy the Recovery Service.
450     */
451    @Override
452    public void destroy() {
453    }
454
455    /**
456     * Return the public interface for the Recovery Service.
457     *
458     * @return {@link RecoveryService}.
459     */
460    @Override
461    public Class<? extends Service> getInterface() {
462        return RecoveryService.class;
463    }
464
465    /**
466     * Merge Bundle job config and the configuration from the coord job to pass
467     * to Coord Engine
468     *
469     * @param coordElem the coordinator configuration
470     * @return Configuration merged configuration
471     * @throws CommandException thrown if failed to merge configuration
472     */
473    private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
474        XLog.Info.get().clear();
475        XLog log = XLog.getLog("RecoveryService");
476
477        String jobConf = bundleJob.getConf();
478        // Step 1: runConf = jobConf
479        Configuration runConf = null;
480        try {
481            runConf = new XConfiguration(new StringReader(jobConf));
482        }
483        catch (IOException e1) {
484            log.warn("Configuration parse error in:" + jobConf);
485            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
486        }
487        // Step 2: Merge local properties into runConf
488        // extract 'property' tags under 'configuration' block in the coordElem
489        // convert Element to XConfiguration
490        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
491
492        if (localConfigElement != null) {
493            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
494            Configuration localConf;
495            try {
496                localConf = new XConfiguration(new StringReader(strConfig));
497            }
498            catch (IOException e1) {
499                log.warn("Configuration parse error in:" + strConfig);
500                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
501            }
502
503            // copy configuration properties in the coordElem to the runConf
504            XConfiguration.copy(localConf, runConf);
505        }
506
507        // Step 3: Extract value of 'app-path' in coordElem, save it as a
508        // new property called 'oozie.coord.application.path', and normalize.
509        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
510        runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
511        // Normalize coordinator appPath here;
512        try {
513            JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
514        }
515        catch (IOException e) {
516            throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
517        }
518        return runConf;
519    }
520}