001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.List;
025    
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.BundleActionBean;
028    import org.apache.oozie.BundleJobBean;
029    import org.apache.oozie.CoordinatorActionBean;
030    import org.apache.oozie.CoordinatorJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.WorkflowActionBean;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.coord.CoordActionInputCheckCommand;
037    import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
038    import org.apache.oozie.command.coord.CoordActionReadyCommand;
039    import org.apache.oozie.command.coord.CoordActionReadyXCommand;
040    import org.apache.oozie.command.coord.CoordActionStartCommand;
041    import org.apache.oozie.command.coord.CoordActionStartXCommand;
042    import org.apache.oozie.command.coord.CoordKillXCommand;
043    import org.apache.oozie.command.coord.CoordResumeXCommand;
044    import org.apache.oozie.command.coord.CoordSubmitXCommand;
045    import org.apache.oozie.command.coord.CoordSuspendXCommand;
046    import org.apache.oozie.command.wf.ActionEndCommand;
047    import org.apache.oozie.command.wf.ActionEndXCommand;
048    import org.apache.oozie.command.wf.ActionStartCommand;
049    import org.apache.oozie.command.wf.ActionStartXCommand;
050    import org.apache.oozie.command.wf.KillXCommand;
051    import org.apache.oozie.command.wf.ResumeXCommand;
052    import org.apache.oozie.command.wf.SignalCommand;
053    import org.apache.oozie.command.wf.SignalXCommand;
054    import org.apache.oozie.command.wf.SuspendXCommand;
055    import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
056    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
057    import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
058    import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
059    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
060    import org.apache.oozie.executor.jpa.JPAExecutorException;
061    import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
062    import org.apache.oozie.util.JobUtils;
063    import org.apache.oozie.util.XCallable;
064    import org.apache.oozie.util.XConfiguration;
065    import org.apache.oozie.util.XLog;
066    import org.apache.oozie.util.XmlUtils;
067    import org.jdom.Attribute;
068    import org.jdom.Element;
069    import org.jdom.JDOMException;
070    
071    /**
072     * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
073     * queues them for execution.
074     */
075    public class RecoveryService implements Service {
076    
077        public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
078        public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
079        public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
080        public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
081        /**
082         * Time interval, in seconds, at which the recovery service will be scheduled to run.
083         */
084        public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
085        /**
086         * The number of callables to be queued in a batch.
087         */
088        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
089        /**
090         * Age of actions to queue, in seconds.
091         */
092        public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
093        /**
094         * Age of coordinator jobs to recover, in seconds.
095         */
096        public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
097    
098        /**
099         * Age of Bundle jobs to recover, in seconds.
100         */
101        public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
102    
103        private static final String INSTRUMENTATION_GROUP = "recovery";
104        private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
105        private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
106        private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
107    
108        private static boolean useXCommand = true;
109    
110    
111        /**
112         * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
113         * queuing of commands.
114         */
115        static class RecoveryRunnable implements Runnable {
116            private final long olderThan;
117            private final long coordOlderThan;
118            private final long bundleOlderThan;
119            private long delay = 0;
120            private List<XCallable<?>> callables;
121            private List<XCallable<?>> delayedCallables;
122            private StringBuilder msg = null;
123            private JPAService jpaService = null;
124    
125            public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
126                this.olderThan = olderThan;
127                this.coordOlderThan = coordOlderThan;
128                this.bundleOlderThan = bundleOlderThan;
129            }
130    
131            public void run() {
132                XLog.Info.get().clear();
133                XLog log = XLog.getLog(getClass());
134                msg = new StringBuilder();
135                jpaService = Services.get().get(JPAService.class);
136                runWFRecovery();
137                runCoordActionRecovery();
138                runCoordActionRecoveryForReady();
139                runBundleRecovery();
140                log.debug("QUEUING [{0}] for potential recovery", msg.toString());
141                boolean ret = false;
142                if (null != callables) {
143                    ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
144                    if (ret == false) {
145                        log.warn("Unable to queue the callables commands for RecoveryService. "
146                                + "Most possibly command queue is full. Queue size is :"
147                                + Services.get().get(CallableQueueService.class).queueSize());
148                    }
149                    callables = null;
150                }
151                if (null != delayedCallables) {
152                    ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
153                    if (ret == false) {
154                        log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
155                                + "Most possibly Callable queue is full. Queue size is :"
156                                + Services.get().get(CallableQueueService.class).queueSize());
157                    }
158                    delayedCallables = null;
159                    this.delay = 0;
160                }
161            }
162    
163            private void runBundleRecovery(){
164                XLog.Info.get().clear();
165                XLog log = XLog.getLog(getClass());
166    
167                try {
168                    List<BundleActionBean> bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
169                    msg.append(", BUNDLE_ACTIONS : " + bactions.size());
170                    for (BundleActionBean baction : bactions) {
171                        Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
172                                INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
173                        if(baction.getStatus() == Job.Status.PREP){
174                            BundleJobBean bundleJob = null;
175                            try {
176                                if (jpaService != null) {
177                                    bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
178                                }
179                                if(bundleJob != null){
180                                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
181                                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
182                                    for (Element coordElem : coordElems) {
183                                        Attribute name = coordElem.getAttribute("name");
184                                        if (name.getValue().equals(baction.getCoordName())) {
185                                            Configuration coordConf = mergeConfig(coordElem,bundleJob);
186                                            coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
187                                            queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
188                                        }
189                                    }
190                                }
191                            }
192                            catch (JDOMException jex) {
193                                throw new CommandException(ErrorCode.E1301, jex);
194                            }
195                            catch (JPAExecutorException je) {
196                                throw new CommandException(je);
197                            }
198                        }
199                        else if(baction.getStatus() == Job.Status.KILLED){
200                            queueCallable(new CoordKillXCommand(baction.getCoordId()));
201                        }
202                        else if(baction.getStatus() == Job.Status.SUSPENDED){
203                            queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
204                        }
205                        else if(baction.getStatus() == Job.Status.RUNNING){
206                            queueCallable(new CoordResumeXCommand(baction.getCoordId()));
207                        }
208                    }
209                }
210                catch (Exception ex) {
211                    log.error("Exception, {0}", ex.getMessage(), ex);
212                }
213            }
214    
215            /**
216             * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
217             */
218            private void runCoordActionRecovery() {
219                XLog.Info.get().clear();
220                XLog log = XLog.getLog(getClass());
221    
222                try {
223                    List<CoordinatorActionBean> cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
224                    msg.append(", COORD_ACTIONS : " + cactions.size());
225                    for (CoordinatorActionBean caction : cactions) {
226                        Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
227                                                                                    INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
228                        if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
229                            if (useXCommand) {
230                                queueCallable(new CoordActionInputCheckXCommand(caction.getId()));
231                            } else {
232                                queueCallable(new CoordActionInputCheckCommand(caction.getId()));
233                            }
234    
235                            log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :" + caction.getId());
236                        }
237                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
238                            CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId()));
239    
240                            if (useXCommand) {
241                                queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob
242                                        .getAuthToken()));
243                            } else {
244                                queueCallable(new CoordActionStartCommand(caction.getId(), coordJob.getUser(), coordJob
245                                        .getAuthToken()));
246                            }
247    
248                            log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" + caction.getId());
249                        }
250                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
251                            if (caction.getExternalId() != null) {
252                                queueCallable(new SuspendXCommand(caction.getExternalId()));
253                                log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" + caction.getId());
254                            }
255                        }
256                        else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
257                            if (caction.getExternalId() != null) {
258                                queueCallable(new KillXCommand(caction.getExternalId()));
259                                log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
260                            }
261                        }
262                        else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
263                            if (caction.getExternalId() != null) {
264                                queueCallable(new ResumeXCommand(caction.getExternalId()));
265                                log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
266                            }
267                        }
268                    }
269                }
270                catch (Exception ex) {
271                    log.error("Exception, {0}", ex.getMessage(), ex);
272                }
273            }
274    
275            /**
276             * Recover coordinator actions that are staying in READY too long
277             */
278            private void runCoordActionRecoveryForReady() {
279                XLog.Info.get().clear();
280                XLog log = XLog.getLog(getClass());
281    
282                try {
283                    List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
284                    msg.append(", COORD_READY_JOBS : " + jobids.size());
285                    for (String jobid : jobids) {
286                        if (useXCommand) {
287                            queueCallable(new CoordActionReadyXCommand(jobid));
288                        } else {
289                            queueCallable(new CoordActionReadyCommand(jobid));
290                        }
291    
292                        log.info("Recover READY coord actions for jobid :" + jobid);
293                    }
294                }
295                catch (Exception ex) {
296                    log.error("Exception, {0}", ex.getMessage(), ex);
297                }
298            }
299    
300            /**
301             * Recover wf actions
302             */
303            private void runWFRecovery() {
304                XLog.Info.get().clear();
305                XLog log = XLog.getLog(getClass());
306                // queue command for action recovery
307                try {
308                    List<WorkflowActionBean> actions = null;
309                    try {
310                        actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
311                    }
312                    catch (JPAExecutorException ex) {
313                        log.warn("Exception while reading pending actions from storage", ex);
314                    }
315                    //log.debug("QUEUING[{0}] pending wf actions for potential recovery", actions.size());
316                    msg.append(" WF_ACTIONS " + actions.size());
317    
318                    for (WorkflowActionBean action : actions) {
319                        Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
320                                INSTR_RECOVERED_ACTIONS_COUNTER, 1);
321                        if (action.getStatus() == WorkflowActionBean.Status.PREP
322                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
323    
324                            if (useXCommand) {
325                                queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
326                            } else {
327                                queueCallable(new ActionStartCommand(action.getId(), action.getType()));
328                            }
329    
330                        }
331                        else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
332                            Date nextRunTime = action.getPendingAge();
333                            if (useXCommand) {
334                                queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
335                                        - System.currentTimeMillis());
336                            } else {
337                                queueCallable(new ActionStartCommand(action.getId(), action.getType()), nextRunTime.getTime()
338                                        - System.currentTimeMillis());
339                            }
340    
341                        }
342                        else if (action.getStatus() == WorkflowActionBean.Status.DONE
343                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
344                            if (useXCommand) {
345                                queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
346                            } else {
347                                queueCallable(new ActionEndCommand(action.getId(), action.getType()));
348                            }
349    
350                        }
351                        else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
352                            Date nextRunTime = action.getPendingAge();
353                            if (useXCommand) {
354                                queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
355                                        - System.currentTimeMillis());
356                            } else {
357                                queueCallable(new ActionEndCommand(action.getId(), action.getType()), nextRunTime.getTime()
358                                        - System.currentTimeMillis());
359                            }
360    
361                        }
362                        else if (action.getStatus() == WorkflowActionBean.Status.OK
363                                || action.getStatus() == WorkflowActionBean.Status.ERROR) {
364                            if (useXCommand) {
365                                queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
366                            } else {
367                                queueCallable(new SignalCommand(action.getJobId(), action.getId()));
368                            }
369    
370                        }
371                        else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
372                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
373                        }
374                    }
375                }
376                catch (Exception ex) {
377                    log.error("Exception, {0}", ex.getMessage(), ex);
378                }
379            }
380    
381            /**
382             * Adds callables to a list. If the number of callables in the list reaches {@link
383             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
384             *
385             * @param callable the callable to queue.
386             */
387            private void queueCallable(XCallable<?> callable) {
388                if (callables == null) {
389                    callables = new ArrayList<XCallable<?>>();
390                }
391                callables.add(callable);
392                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
393                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
394                    if (ret == false) {
395                        XLog.getLog(getClass()).warn(
396                                "Unable to queue the callables commands for RecoveryService. "
397                                        + "Most possibly command queue is full. Queue size is :"
398                                        + Services.get().get(CallableQueueService.class).queueSize());
399                    }
400                    callables = new ArrayList<XCallable<?>>();
401                }
402            }
403    
404            /**
405             * Adds callables to a list. If the number of callables in the list reaches {@link
406             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
407             * of the callables in the list. The callables list and the delay is reset.
408             *
409             * @param callable the callable to queue.
410             * @param delay the delay for the callable.
411             */
412            private void queueCallable(XCallable<?> callable, long delay) {
413                if (delayedCallables == null) {
414                    delayedCallables = new ArrayList<XCallable<?>>();
415                }
416                this.delay = Math.max(this.delay, delay);
417                delayedCallables.add(callable);
418                if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
419                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
420                    if (ret == false) {
421                        XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
422                                + "Most possibly Callable queue is full. Queue size is :"
423                                + Services.get().get(CallableQueueService.class).queueSize());
424                    }
425                    delayedCallables = new ArrayList<XCallable<?>>();
426                    this.delay = 0;
427                }
428            }
429        }
430    
431        /**
432         * Initializes the RecoveryService.
433         *
434         * @param services services instance.
435         */
436        @Override
437        public void init(Services services) {
438            Configuration conf = services.getConf();
439            Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
440                    CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
441            services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600),
442                                                          SchedulerService.Unit.SEC);
443    
444            if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
445                useXCommand = false;
446            }
447        }
448    
449        /**
450         * Destroy the Recovery Service.
451         */
452        @Override
453        public void destroy() {
454        }
455    
456        /**
457         * Return the public interface for the Recovery Service.
458         *
459         * @return {@link RecoveryService}.
460         */
461        @Override
462        public Class<? extends Service> getInterface() {
463            return RecoveryService.class;
464        }
465    
466        /**
467         * Merge Bundle job config and the configuration from the coord job to pass
468         * to Coord Engine
469         *
470         * @param coordElem the coordinator configuration
471         * @return Configuration merged configuration
472         * @throws CommandException thrown if failed to merge configuration
473         */
474        private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
475            XLog.Info.get().clear();
476            XLog log = XLog.getLog("RecoveryService");
477    
478            String jobConf = bundleJob.getConf();
479            // Step 1: runConf = jobConf
480            Configuration runConf = null;
481            try {
482                runConf = new XConfiguration(new StringReader(jobConf));
483            }
484            catch (IOException e1) {
485                log.warn("Configuration parse error in:" + jobConf);
486                throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
487            }
488            // Step 2: Merge local properties into runConf
489            // extract 'property' tags under 'configuration' block in the coordElem
490            // convert Element to XConfiguration
491            Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
492    
493            if (localConfigElement != null) {
494                String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
495                Configuration localConf;
496                try {
497                    localConf = new XConfiguration(new StringReader(strConfig));
498                }
499                catch (IOException e1) {
500                    log.warn("Configuration parse error in:" + strConfig);
501                    throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
502                }
503    
504                // copy configuration properties in the coordElem to the runConf
505                XConfiguration.copy(localConf, runConf);
506            }
507    
508            // Step 3: Extract value of 'app-path' in coordElem, save it as a
509            // new property called 'oozie.coord.application.path', and normalize.
510            String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
511            runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
512            // Normalize coordinator appPath here;
513            try {
514                JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
515            }
516            catch (IOException e) {
517                throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
518            }
519            return runConf;
520        }
521    }