001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.List;
025    
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.BundleActionBean;
028    import org.apache.oozie.BundleJobBean;
029    import org.apache.oozie.CoordinatorActionBean;
030    import org.apache.oozie.CoordinatorJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.WorkflowActionBean;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
037    import org.apache.oozie.command.coord.CoordActionReadyXCommand;
038    import org.apache.oozie.command.coord.CoordActionStartXCommand;
039    import org.apache.oozie.command.coord.CoordKillXCommand;
040    import org.apache.oozie.command.coord.CoordResumeXCommand;
041    import org.apache.oozie.command.coord.CoordSubmitXCommand;
042    import org.apache.oozie.command.coord.CoordSuspendXCommand;
043    import org.apache.oozie.command.wf.ActionEndXCommand;
044    import org.apache.oozie.command.wf.ActionStartXCommand;
045    import org.apache.oozie.command.wf.KillXCommand;
046    import org.apache.oozie.command.wf.ResumeXCommand;
047    import org.apache.oozie.command.wf.SignalXCommand;
048    import org.apache.oozie.command.wf.SuspendXCommand;
049    import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
050    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
051    import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
052    import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
053    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
054    import org.apache.oozie.executor.jpa.JPAExecutorException;
055    import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
056    import org.apache.oozie.util.JobUtils;
057    import org.apache.oozie.util.XCallable;
058    import org.apache.oozie.util.XConfiguration;
059    import org.apache.oozie.util.XLog;
060    import org.apache.oozie.util.XmlUtils;
061    import org.jdom.Attribute;
062    import org.jdom.Element;
063    import org.jdom.JDOMException;
064    
065    /**
066     * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
067     * queues them for execution.
068     */
069    public class RecoveryService implements Service {
070    
071        public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
072        public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
073        public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
074        public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
075        /**
076         * Time interval, in seconds, at which the recovery service will be scheduled to run.
077         */
078        public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
079        /**
080         * The number of callables to be queued in a batch.
081         */
082        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
083        /**
084         * Age of actions to queue, in seconds.
085         */
086        public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
087        /**
088         * Age of coordinator jobs to recover, in seconds.
089         */
090        public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
091    
092        /**
093         * Age of Bundle jobs to recover, in seconds.
094         */
095        public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
096    
097        private static final String INSTRUMENTATION_GROUP = "recovery";
098        private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
099        private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
100        private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
101    
102    
103        /**
104         * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
105         * queuing of commands.
106         */
107        static class RecoveryRunnable implements Runnable {
108            private final long olderThan;
109            private final long coordOlderThan;
110            private final long bundleOlderThan;
111            private long delay = 0;
112            private List<XCallable<?>> callables;
113            private List<XCallable<?>> delayedCallables;
114            private StringBuilder msg = null;
115            private JPAService jpaService = null;
116    
117            public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
118                this.olderThan = olderThan;
119                this.coordOlderThan = coordOlderThan;
120                this.bundleOlderThan = bundleOlderThan;
121            }
122    
123            public void run() {
124                XLog.Info.get().clear();
125                XLog log = XLog.getLog(getClass());
126                msg = new StringBuilder();
127                jpaService = Services.get().get(JPAService.class);
128                runWFRecovery();
129                runCoordActionRecovery();
130                runCoordActionRecoveryForReady();
131                runBundleRecovery();
132                log.debug("QUEUING [{0}] for potential recovery", msg.toString());
133                boolean ret = false;
134                if (null != callables) {
135                    ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
136                    if (ret == false) {
137                        log.warn("Unable to queue the callables commands for RecoveryService. "
138                                + "Most possibly command queue is full. Queue size is :"
139                                + Services.get().get(CallableQueueService.class).queueSize());
140                    }
141                    callables = null;
142                }
143                if (null != delayedCallables) {
144                    ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
145                    if (ret == false) {
146                        log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
147                                + "Most possibly Callable queue is full. Queue size is :"
148                                + Services.get().get(CallableQueueService.class).queueSize());
149                    }
150                    delayedCallables = null;
151                    this.delay = 0;
152                }
153            }
154    
155            private void runBundleRecovery(){
156                XLog.Info.get().clear();
157                XLog log = XLog.getLog(getClass());
158    
159                try {
160                    List<BundleActionBean> bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
161                    msg.append(", BUNDLE_ACTIONS : " + bactions.size());
162                    for (BundleActionBean baction : bactions) {
163                        Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
164                                INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
165                        if(baction.getStatus() == Job.Status.PREP){
166                            BundleJobBean bundleJob = null;
167                            try {
168                                if (jpaService != null) {
169                                    bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
170                                }
171                                if(bundleJob != null){
172                                    Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
173                                    List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
174                                    for (Element coordElem : coordElems) {
175                                        Attribute name = coordElem.getAttribute("name");
176                                        if (name.getValue().equals(baction.getCoordName())) {
177                                            Configuration coordConf = mergeConfig(coordElem,bundleJob);
178                                            coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
179                                            queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
180                                        }
181                                    }
182                                }
183                            }
184                            catch (JDOMException jex) {
185                                throw new CommandException(ErrorCode.E1301, jex);
186                            }
187                            catch (JPAExecutorException je) {
188                                throw new CommandException(je);
189                            }
190                        }
191                        else if(baction.getStatus() == Job.Status.KILLED){
192                            queueCallable(new CoordKillXCommand(baction.getCoordId()));
193                        }
194                        else if(baction.getStatus() == Job.Status.SUSPENDED){
195                            queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
196                        }
197                        else if(baction.getStatus() == Job.Status.RUNNING){
198                            queueCallable(new CoordResumeXCommand(baction.getCoordId()));
199                        }
200                    }
201                }
202                catch (Exception ex) {
203                    log.error("Exception, {0}", ex.getMessage(), ex);
204                }
205            }
206    
207            /**
208             * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
209             */
210            private void runCoordActionRecovery() {
211                XLog.Info.get().clear();
212                XLog log = XLog.getLog(getClass());
213    
214                try {
215                    List<CoordinatorActionBean> cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
216                    msg.append(", COORD_ACTIONS : " + cactions.size());
217                    for (CoordinatorActionBean caction : cactions) {
218                        Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
219                                                                                    INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
220                        if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
221                                                    queueCallable(new CoordActionInputCheckXCommand(
222                                                                    caction.getId(), caction.getJobId()));
223    
224                                                    log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :"
225                                                                    + caction.getId());
226                        }
227                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
228                            CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId()));
229                                                    queueCallable(new CoordActionStartXCommand(
230                                                                    caction.getId(), coordJob.getUser(),
231                                                                    coordJob.getAuthToken(), caction.getJobId()));
232    
233                            log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" + caction.getId());
234                        }
235                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
236                            if (caction.getExternalId() != null) {
237                                queueCallable(new SuspendXCommand(caction.getExternalId()));
238                                log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" + caction.getId());
239                            }
240                        }
241                        else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
242                            if (caction.getExternalId() != null) {
243                                queueCallable(new KillXCommand(caction.getExternalId()));
244                                log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
245                            }
246                        }
247                        else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
248                            if (caction.getExternalId() != null) {
249                                queueCallable(new ResumeXCommand(caction.getExternalId()));
250                                log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
251                            }
252                        }
253                    }
254                }
255                catch (Exception ex) {
256                    log.error("Exception, {0}", ex.getMessage(), ex);
257                }
258            }
259    
260            /**
261             * Recover coordinator actions that are staying in READY too long
262             */
263            private void runCoordActionRecoveryForReady() {
264                XLog.Info.get().clear();
265                XLog log = XLog.getLog(getClass());
266    
267                try {
268                    List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
269                    msg.append(", COORD_READY_JOBS : " + jobids.size());
270                    for (String jobid : jobids) {
271                            queueCallable(new CoordActionReadyXCommand(jobid));
272    
273                        log.info("Recover READY coord actions for jobid :" + jobid);
274                    }
275                }
276                catch (Exception ex) {
277                    log.error("Exception, {0}", ex.getMessage(), ex);
278                }
279            }
280    
281            /**
282             * Recover wf actions
283             */
284            private void runWFRecovery() {
285                XLog.Info.get().clear();
286                XLog log = XLog.getLog(getClass());
287                // queue command for action recovery
288                try {
289                    List<WorkflowActionBean> actions = null;
290                    try {
291                        actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
292                    }
293                    catch (JPAExecutorException ex) {
294                        log.warn("Exception while reading pending actions from storage", ex);
295                    }
296                    //log.debug("QUEUING[{0}] pending wf actions for potential recovery", actions.size());
297                    msg.append(" WF_ACTIONS " + actions.size());
298    
299                    for (WorkflowActionBean action : actions) {
300                        Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
301                                INSTR_RECOVERED_ACTIONS_COUNTER, 1);
302                        if (action.getStatus() == WorkflowActionBean.Status.PREP
303                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
304                                                    queueCallable(new ActionStartXCommand(action.getId(),
305                                                                    action.getType()));
306                        }
307                        else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
308                            Date nextRunTime = action.getPendingAge();
309                                queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
310                                        - System.currentTimeMillis());
311                        }
312                        else if (action.getStatus() == WorkflowActionBean.Status.DONE
313                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
314                                queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
315                        }
316                        else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
317                            Date nextRunTime = action.getPendingAge();
318                                                    queueCallable(new ActionEndXCommand(action.getId(),
319                                                                    action.getType()), nextRunTime.getTime()
320                                                                    - System.currentTimeMillis());
321    
322                        }
323                        else if (action.getStatus() == WorkflowActionBean.Status.OK
324                                || action.getStatus() == WorkflowActionBean.Status.ERROR) {
325                                                    queueCallable(new SignalXCommand(action.getJobId(),
326                                                                    action.getId()));
327                        }
328                        else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
329                                                    queueCallable(new ActionStartXCommand(action.getId(),
330                                                                    action.getType()));
331                        }
332                    }
333                }
334                catch (Exception ex) {
335                    log.error("Exception, {0}", ex.getMessage(), ex);
336                }
337            }
338    
339            /**
340             * Adds callables to a list. If the number of callables in the list reaches {@link
341             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
342             *
343             * @param callable the callable to queue.
344             */
345            private void queueCallable(XCallable<?> callable) {
346                if (callables == null) {
347                    callables = new ArrayList<XCallable<?>>();
348                }
349                callables.add(callable);
350                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
351                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
352                    if (ret == false) {
353                        XLog.getLog(getClass()).warn(
354                                "Unable to queue the callables commands for RecoveryService. "
355                                        + "Most possibly command queue is full. Queue size is :"
356                                        + Services.get().get(CallableQueueService.class).queueSize());
357                    }
358                    callables = new ArrayList<XCallable<?>>();
359                }
360            }
361    
362            /**
363             * Adds callables to a list. If the number of callables in the list reaches {@link
364             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
365             * of the callables in the list. The callables list and the delay is reset.
366             *
367             * @param callable the callable to queue.
368             * @param delay the delay for the callable.
369             */
370            private void queueCallable(XCallable<?> callable, long delay) {
371                if (delayedCallables == null) {
372                    delayedCallables = new ArrayList<XCallable<?>>();
373                }
374                this.delay = Math.max(this.delay, delay);
375                delayedCallables.add(callable);
376                if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
377                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
378                    if (ret == false) {
379                        XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
380                                + "Most possibly Callable queue is full. Queue size is :"
381                                + Services.get().get(CallableQueueService.class).queueSize());
382                    }
383                    delayedCallables = new ArrayList<XCallable<?>>();
384                    this.delay = 0;
385                }
386            }
387        }
388    
389        /**
390         * Initializes the RecoveryService.
391         *
392         * @param services services instance.
393         */
394        @Override
395        public void init(Services services) {
396            Configuration conf = services.getConf();
397            Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
398                    CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
399            services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600),
400                                                          SchedulerService.Unit.SEC);
401        }
402    
403        /**
404         * Destroy the Recovery Service.
405         */
406        @Override
407        public void destroy() {
408        }
409    
410        /**
411         * Return the public interface for the Recovery Service.
412         *
413         * @return {@link RecoveryService}.
414         */
415        @Override
416        public Class<? extends Service> getInterface() {
417            return RecoveryService.class;
418        }
419    
420        /**
421         * Merge Bundle job config and the configuration from the coord job to pass
422         * to Coord Engine
423         *
424         * @param coordElem the coordinator configuration
425         * @return Configuration merged configuration
426         * @throws CommandException thrown if failed to merge configuration
427         */
428        private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
429            XLog.Info.get().clear();
430            XLog log = XLog.getLog("RecoveryService");
431    
432            String jobConf = bundleJob.getConf();
433            // Step 1: runConf = jobConf
434            Configuration runConf = null;
435            try {
436                runConf = new XConfiguration(new StringReader(jobConf));
437            }
438            catch (IOException e1) {
439                log.warn("Configuration parse error in:" + jobConf);
440                throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
441            }
442            // Step 2: Merge local properties into runConf
443            // extract 'property' tags under 'configuration' block in the coordElem
444            // convert Element to XConfiguration
445            Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
446    
447            if (localConfigElement != null) {
448                String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
449                Configuration localConf;
450                try {
451                    localConf = new XConfiguration(new StringReader(strConfig));
452                }
453                catch (IOException e1) {
454                    log.warn("Configuration parse error in:" + strConfig);
455                    throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
456                }
457    
458                // copy configuration properties in the coordElem to the runConf
459                XConfiguration.copy(localConf, runConf);
460            }
461    
462            // Step 3: Extract value of 'app-path' in coordElem, save it as a
463            // new property called 'oozie.coord.application.path', and normalize.
464            String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
465            runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
466            // Normalize coordinator appPath here;
467            try {
468                JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
469            }
470            catch (IOException e) {
471                throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
472            }
473            return runConf;
474        }
475    }