001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.List;
025    
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.BundleActionBean;
028    import org.apache.oozie.BundleJobBean;
029    import org.apache.oozie.CoordinatorActionBean;
030    import org.apache.oozie.CoordinatorJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.WorkflowActionBean;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
037    import org.apache.oozie.command.coord.CoordActionReadyXCommand;
038    import org.apache.oozie.command.coord.CoordActionStartXCommand;
039    import org.apache.oozie.command.coord.CoordKillXCommand;
040    import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
041    import org.apache.oozie.command.coord.CoordResumeXCommand;
042    import org.apache.oozie.command.coord.CoordSubmitXCommand;
043    import org.apache.oozie.command.coord.CoordSuspendXCommand;
044    import org.apache.oozie.command.wf.ActionEndXCommand;
045    import org.apache.oozie.command.wf.ActionStartXCommand;
046    import org.apache.oozie.command.wf.KillXCommand;
047    import org.apache.oozie.command.wf.ResumeXCommand;
048    import org.apache.oozie.command.wf.SignalXCommand;
049    import org.apache.oozie.command.wf.SuspendXCommand;
050    import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
051    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
052    import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
053    import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
054    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
055    import org.apache.oozie.executor.jpa.JPAExecutorException;
056    import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
057    import org.apache.oozie.util.JobUtils;
058    import org.apache.oozie.util.XCallable;
059    import org.apache.oozie.util.XConfiguration;
060    import org.apache.oozie.util.XLog;
061    import org.apache.oozie.util.XmlUtils;
062    import org.jdom.Attribute;
063    import org.jdom.Element;
064    
065    /**
066     * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
067     * queues them for execution.
068     */
069    public class RecoveryService implements Service {
070    
071        public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
072        public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
073        public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
074        public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
075        /**
076         * Time interval, in seconds, at which the recovery service will be scheduled to run.
077         */
078        public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
079        /**
080         * The number of callables to be queued in a batch.
081         */
082        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
083    
084        /**
085         * Delay for the push missing dependencies in milliseconds.
086         */
087        public static final String CONF_PUSH_DEPENDENCY_INTERVAL = CONF_PREFIX + "push.dependency.interval";
088    
089        /**
090         * Age of actions to queue, in seconds.
091         */
092        public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
093        /**
094         * Age of coordinator jobs to recover, in seconds.
095         */
096        public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
097    
098        /**
099         * Age of Bundle jobs to recover, in seconds.
100         */
101        public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
102    
103        private static final String INSTRUMENTATION_GROUP = "recovery";
104        private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
105        private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
106        private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
107    
108    
109        /**
110         * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
111         * queuing of commands.
112         */
113        static class RecoveryRunnable implements Runnable {
114            private final long olderThan;
115            private final long coordOlderThan;
116            private final long bundleOlderThan;
117            private long delay = 0;
118            private List<XCallable<?>> callables;
119            private List<XCallable<?>> delayedCallables;
120            private StringBuilder msg = null;
121            private JPAService jpaService = null;
122    
123            public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
124                this.olderThan = olderThan;
125                this.coordOlderThan = coordOlderThan;
126                this.bundleOlderThan = bundleOlderThan;
127            }
128    
129            public void run() {
130                XLog.Info.get().clear();
131                XLog log = XLog.getLog(getClass());
132                msg = new StringBuilder();
133                jpaService = Services.get().get(JPAService.class);
134                runWFRecovery();
135                runCoordActionRecovery();
136                runCoordActionRecoveryForReady();
137                runBundleRecovery();
138                log.debug("QUEUING [{0}] for potential recovery", msg.toString());
139                boolean ret = false;
140                if (null != callables) {
141                    ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
142                    if (ret == false) {
143                        log.warn("Unable to queue the callables commands for RecoveryService. "
144                                + "Most possibly command queue is full. Queue size is :"
145                                + Services.get().get(CallableQueueService.class).queueSize());
146                    }
147                    callables = null;
148                }
149                if (null != delayedCallables) {
150                    ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
151                    if (ret == false) {
152                        log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
153                                + "Most possibly Callable queue is full. Queue size is :"
154                                + Services.get().get(CallableQueueService.class).queueSize());
155                    }
156                    delayedCallables = null;
157                    this.delay = 0;
158                }
159            }
160    
161            private void runBundleRecovery(){
162                XLog.Info.get().clear();
163                XLog log = XLog.getLog(getClass());
164                List<BundleActionBean> bactions = null;
165                try {
166                    bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
167                }
168                catch (JPAExecutorException ex) {
169                    log.warn("Error reading bundle actions from database", ex);
170                    return;
171                }
172                msg.append(", BUNDLE_ACTIONS : " + bactions.size());
173                for (BundleActionBean baction : bactions) {
174                    try {
175                        Services.get().get(InstrumentationService.class).get()
176                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
177                        if (baction.getCoordId() == null) {
178                            log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
179                            continue;
180                        }
181                        if (baction.getStatus() == Job.Status.PREP) {
182                            BundleJobBean bundleJob = null;
183    
184                            if (jpaService != null) {
185                                bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
186                            }
187                            if (bundleJob != null) {
188                                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
189                                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
190                                for (Element coordElem : coordElems) {
191                                    Attribute name = coordElem.getAttribute("name");
192                                    if (name.getValue().equals(baction.getCoordName())) {
193                                        Configuration coordConf = mergeConfig(coordElem, bundleJob);
194                                        coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
195                                        queueCallable(new CoordSubmitXCommand(coordConf,
196                                                bundleJob.getId(), name.getValue()));
197                                    }
198                                }
199                            }
200    
201                        }
202                        else if (baction.getStatus() == Job.Status.KILLED) {
203                            queueCallable(new CoordKillXCommand(baction.getCoordId()));
204                        }
205                        else if (baction.getStatus() == Job.Status.SUSPENDED
206                                || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
207                            queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
208                        }
209                        else if (baction.getStatus() == Job.Status.RUNNING
210                                || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
211                            queueCallable(new CoordResumeXCommand(baction.getCoordId()));
212                        }
213                    }
214                    catch (Exception ex) {
215                        log.error("Exception, {0}", ex.getMessage(), ex);
216                    }
217                }
218    
219            }
220    
221            /**
222             * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
223             */
224            private void runCoordActionRecovery() {
225                XLog.Info.get().clear();
226                XLog log = XLog.getLog(getClass());
227                long pushMissingDepInterval = Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_INTERVAL, 200);
228                long pushMissingDepDelay = pushMissingDepInterval;
229                List<CoordinatorActionBean> cactions = null;
230                try {
231                    cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
232                }
233                catch (JPAExecutorException ex) {
234                    log.warn("Error reading coord actions from database", ex);
235                    return;
236                }
237                msg.append(", COORD_ACTIONS : " + cactions.size());
238                for (CoordinatorActionBean caction : cactions) {
239                    try {
240                        Services.get().get(InstrumentationService.class).get()
241                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
242                        if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
243                            queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
244                            log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :"
245                                    + caction.getId());
246                            if (caction.getPushMissingDependencies() != null
247                                    && caction.getPushMissingDependencies().length() != 0) {
248                                queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
249                                        pushMissingDepDelay);
250                                pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
251                                log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :"
252                                        + caction.getId());
253                            }
254                        }
255                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
256                            CoordinatorJobBean coordJob = jpaService
257                                    .execute(new CoordJobGetJPAExecutor(caction.getJobId()));
258                            queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
259                                    coordJob.getAppName(), caction.getJobId()));
260    
261                            log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
262                                    + caction.getId());
263                        }
264                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
265                            if (caction.getExternalId() != null) {
266                                queueCallable(new SuspendXCommand(caction.getExternalId()));
267                                log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :"
268                                        + caction.getId());
269                            }
270                        }
271                        else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
272                            if (caction.getExternalId() != null) {
273                                queueCallable(new KillXCommand(caction.getExternalId()));
274                                log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
275                            }
276                        }
277                        else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
278                            if (caction.getExternalId() != null) {
279                                queueCallable(new ResumeXCommand(caction.getExternalId()));
280                                log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
281                            }
282                        }
283                    }
284                    catch (Exception ex) {
285                        log.error("Exception, {0}", ex.getMessage(), ex);
286                    }
287                }
288    
289    
290            }
291    
292            /**
293             * Recover coordinator actions that are staying in READY too long
294             */
295            private void runCoordActionRecoveryForReady() {
296                XLog.Info.get().clear();
297                XLog log = XLog.getLog(getClass());
298    
299                try {
300                    List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
301                    msg.append(", COORD_READY_JOBS : " + jobids.size());
302                    for (String jobid : jobids) {
303                            queueCallable(new CoordActionReadyXCommand(jobid));
304    
305                        log.info("Recover READY coord actions for jobid :" + jobid);
306                    }
307                }
308                catch (Exception ex) {
309                    log.error("Exception, {0}", ex.getMessage(), ex);
310                }
311            }
312    
313            /**
314             * Recover wf actions
315             */
316            private void runWFRecovery() {
317                XLog.Info.get().clear();
318                XLog log = XLog.getLog(getClass());
319                // queue command for action recovery
320                List<WorkflowActionBean> actions = null;
321                try {
322                    actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
323                }
324                catch (JPAExecutorException ex) {
325                    log.warn("Exception while reading pending actions from storage", ex);
326                    return;
327                }
328                // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
329                // actions.size());
330                msg.append(" WF_ACTIONS " + actions.size());
331    
332                for (WorkflowActionBean action : actions) {
333                    try {
334                        Services.get().get(InstrumentationService.class).get()
335                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
336                        if (action.getStatus() == WorkflowActionBean.Status.PREP
337                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
338                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
339                        }
340                        else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
341                            Date nextRunTime = action.getPendingAge();
342                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
343                                    - System.currentTimeMillis());
344                        }
345                        else if (action.getStatus() == WorkflowActionBean.Status.DONE
346                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
347                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
348                        }
349                        else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
350                            Date nextRunTime = action.getPendingAge();
351                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
352                                    - System.currentTimeMillis());
353    
354                        }
355                        else if (action.getStatus() == WorkflowActionBean.Status.OK
356                                || action.getStatus() == WorkflowActionBean.Status.ERROR) {
357                            queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
358                        }
359                        else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
360                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
361                        }
362                    }
363                    catch (Exception ex) {
364                        log.error("Exception, {0}", ex.getMessage(), ex);
365                    }
366                }
367    
368            }
369    
370            /**
371             * Adds callables to a list. If the number of callables in the list reaches {@link
372             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
373             *
374             * @param callable the callable to queue.
375             */
376            private void queueCallable(XCallable<?> callable) {
377                if (callables == null) {
378                    callables = new ArrayList<XCallable<?>>();
379                }
380                callables.add(callable);
381                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
382                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
383                    if (ret == false) {
384                        XLog.getLog(getClass()).warn(
385                                "Unable to queue the callables commands for RecoveryService. "
386                                        + "Most possibly command queue is full. Queue size is :"
387                                        + Services.get().get(CallableQueueService.class).queueSize());
388                    }
389                    callables = new ArrayList<XCallable<?>>();
390                }
391            }
392    
393            /**
394             * Adds callables to a list. If the number of callables in the list reaches {@link
395             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
396             * of the callables in the list. The callables list and the delay is reset.
397             *
398             * @param callable the callable to queue.
399             * @param delay the delay for the callable.
400             */
401            private void queueCallable(XCallable<?> callable, long delay) {
402                if (delayedCallables == null) {
403                    delayedCallables = new ArrayList<XCallable<?>>();
404                }
405                this.delay = Math.max(this.delay, delay);
406                delayedCallables.add(callable);
407                if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
408                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
409                    if (ret == false) {
410                        XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
411                                + "Most possibly Callable queue is full. Queue size is :"
412                                + Services.get().get(CallableQueueService.class).queueSize());
413                    }
414                    delayedCallables = new ArrayList<XCallable<?>>();
415                    this.delay = 0;
416                }
417            }
418        }
419    
420        /**
421         * Initializes the RecoveryService.
422         *
423         * @param services services instance.
424         */
425        @Override
426        public void init(Services services) {
427            Configuration conf = services.getConf();
428            Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
429                    CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
430            services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf),
431                                                          SchedulerService.Unit.SEC);
432        }
433    
434        public int getRecoveryServiceInterval(Configuration conf){
435            return conf.getInt(CONF_SERVICE_INTERVAL, 60);
436        }
437    
438        /**
439         * Destroy the Recovery Service.
440         */
441        @Override
442        public void destroy() {
443        }
444    
445        /**
446         * Return the public interface for the Recovery Service.
447         *
448         * @return {@link RecoveryService}.
449         */
450        @Override
451        public Class<? extends Service> getInterface() {
452            return RecoveryService.class;
453        }
454    
455        /**
456         * Merge Bundle job config and the configuration from the coord job to pass
457         * to Coord Engine
458         *
459         * @param coordElem the coordinator configuration
460         * @return Configuration merged configuration
461         * @throws CommandException thrown if failed to merge configuration
462         */
463        private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
464            XLog.Info.get().clear();
465            XLog log = XLog.getLog("RecoveryService");
466    
467            String jobConf = bundleJob.getConf();
468            // Step 1: runConf = jobConf
469            Configuration runConf = null;
470            try {
471                runConf = new XConfiguration(new StringReader(jobConf));
472            }
473            catch (IOException e1) {
474                log.warn("Configuration parse error in:" + jobConf);
475                throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
476            }
477            // Step 2: Merge local properties into runConf
478            // extract 'property' tags under 'configuration' block in the coordElem
479            // convert Element to XConfiguration
480            Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
481    
482            if (localConfigElement != null) {
483                String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
484                Configuration localConf;
485                try {
486                    localConf = new XConfiguration(new StringReader(strConfig));
487                }
488                catch (IOException e1) {
489                    log.warn("Configuration parse error in:" + strConfig);
490                    throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
491                }
492    
493                // copy configuration properties in the coordElem to the runConf
494                XConfiguration.copy(localConf, runConf);
495            }
496    
497            // Step 3: Extract value of 'app-path' in coordElem, save it as a
498            // new property called 'oozie.coord.application.path', and normalize.
499            String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
500            runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
501            // Normalize coordinator appPath here;
502            try {
503                JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
504            }
505            catch (IOException e) {
506                throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
507            }
508            return runConf;
509        }
510    }