001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.List;
025    
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.BundleActionBean;
028    import org.apache.oozie.BundleJobBean;
029    import org.apache.oozie.CoordinatorActionBean;
030    import org.apache.oozie.CoordinatorJobBean;
031    import org.apache.oozie.ErrorCode;
032    import org.apache.oozie.WorkflowActionBean;
033    import org.apache.oozie.client.Job;
034    import org.apache.oozie.client.OozieClient;
035    import org.apache.oozie.command.CommandException;
036    import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
037    import org.apache.oozie.command.coord.CoordActionReadyXCommand;
038    import org.apache.oozie.command.coord.CoordActionStartXCommand;
039    import org.apache.oozie.command.coord.CoordKillXCommand;
040    import org.apache.oozie.command.coord.CoordResumeXCommand;
041    import org.apache.oozie.command.coord.CoordSubmitXCommand;
042    import org.apache.oozie.command.coord.CoordSuspendXCommand;
043    import org.apache.oozie.command.wf.ActionEndXCommand;
044    import org.apache.oozie.command.wf.ActionStartXCommand;
045    import org.apache.oozie.command.wf.KillXCommand;
046    import org.apache.oozie.command.wf.ResumeXCommand;
047    import org.apache.oozie.command.wf.SignalXCommand;
048    import org.apache.oozie.command.wf.SuspendXCommand;
049    import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
050    import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
051    import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
052    import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
053    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
054    import org.apache.oozie.executor.jpa.JPAExecutorException;
055    import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
056    import org.apache.oozie.util.JobUtils;
057    import org.apache.oozie.util.XCallable;
058    import org.apache.oozie.util.XConfiguration;
059    import org.apache.oozie.util.XLog;
060    import org.apache.oozie.util.XmlUtils;
061    import org.jdom.Attribute;
062    import org.jdom.Element;
063    import org.jdom.JDOMException;
064    
065    /**
066     * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
067     * queues them for execution.
068     */
069    public class RecoveryService implements Service {
070    
071        public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
072        public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
073        public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
074        public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
075        /**
076         * Time interval, in seconds, at which the recovery service will be scheduled to run.
077         */
078        public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
079        /**
080         * The number of callables to be queued in a batch.
081         */
082        public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
083        /**
084         * Age of actions to queue, in seconds.
085         */
086        public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
087        /**
088         * Age of coordinator jobs to recover, in seconds.
089         */
090        public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
091    
092        /**
093         * Age of Bundle jobs to recover, in seconds.
094         */
095        public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
096    
097        private static final String INSTRUMENTATION_GROUP = "recovery";
098        private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
099        private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
100        private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
101    
102    
103        /**
104         * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
105         * queuing of commands.
106         */
107        static class RecoveryRunnable implements Runnable {
108            private final long olderThan;
109            private final long coordOlderThan;
110            private final long bundleOlderThan;
111            private long delay = 0;
112            private List<XCallable<?>> callables;
113            private List<XCallable<?>> delayedCallables;
114            private StringBuilder msg = null;
115            private JPAService jpaService = null;
116    
117            public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
118                this.olderThan = olderThan;
119                this.coordOlderThan = coordOlderThan;
120                this.bundleOlderThan = bundleOlderThan;
121            }
122    
123            public void run() {
124                XLog.Info.get().clear();
125                XLog log = XLog.getLog(getClass());
126                msg = new StringBuilder();
127                jpaService = Services.get().get(JPAService.class);
128                runWFRecovery();
129                runCoordActionRecovery();
130                runCoordActionRecoveryForReady();
131                runBundleRecovery();
132                log.debug("QUEUING [{0}] for potential recovery", msg.toString());
133                boolean ret = false;
134                if (null != callables) {
135                    ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
136                    if (ret == false) {
137                        log.warn("Unable to queue the callables commands for RecoveryService. "
138                                + "Most possibly command queue is full. Queue size is :"
139                                + Services.get().get(CallableQueueService.class).queueSize());
140                    }
141                    callables = null;
142                }
143                if (null != delayedCallables) {
144                    ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
145                    if (ret == false) {
146                        log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
147                                + "Most possibly Callable queue is full. Queue size is :"
148                                + Services.get().get(CallableQueueService.class).queueSize());
149                    }
150                    delayedCallables = null;
151                    this.delay = 0;
152                }
153            }
154    
155            private void runBundleRecovery(){
156                XLog.Info.get().clear();
157                XLog log = XLog.getLog(getClass());
158                List<BundleActionBean> bactions = null;
159                try {
160                    bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
161                }
162                catch (JPAExecutorException ex) {
163                    log.warn("Error reading bundle actions from database", ex);
164                    return;
165                }
166                msg.append(", BUNDLE_ACTIONS : " + bactions.size());
167                for (BundleActionBean baction : bactions) {
168                    try {
169                        Services.get().get(InstrumentationService.class).get()
170                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
171                        if (baction.getCoordId() == null) {
172                            log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
173                            continue;
174                        }
175                        if (baction.getStatus() == Job.Status.PREP) {
176                            BundleJobBean bundleJob = null;
177    
178                            if (jpaService != null) {
179                                bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
180                            }
181                            if (bundleJob != null) {
182                                Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
183                                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
184                                for (Element coordElem : coordElems) {
185                                    Attribute name = coordElem.getAttribute("name");
186                                    if (name.getValue().equals(baction.getCoordName())) {
187                                        Configuration coordConf = mergeConfig(coordElem, bundleJob);
188                                        coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
189                                        queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(),
190                                                bundleJob.getId(), name.getValue()));
191                                    }
192                                }
193                            }
194    
195                        }
196                        else if (baction.getStatus() == Job.Status.KILLED) {
197                            queueCallable(new CoordKillXCommand(baction.getCoordId()));
198                        }
199                        else if (baction.getStatus() == Job.Status.SUSPENDED
200                                || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
201                            queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
202                        }
203                        else if (baction.getStatus() == Job.Status.RUNNING
204                                || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
205                            queueCallable(new CoordResumeXCommand(baction.getCoordId()));
206                        }
207                    }
208                    catch (Exception ex) {
209                        log.error("Exception, {0}", ex.getMessage(), ex);
210                    }
211                }
212    
213    
214            }
215    
216            /**
217             * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
218             */
219            private void runCoordActionRecovery() {
220                XLog.Info.get().clear();
221                XLog log = XLog.getLog(getClass());
222                List<CoordinatorActionBean> cactions = null;
223                try {
224                    cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
225                }
226                catch (JPAExecutorException ex) {
227                    log.warn("Error reading coord actions from database", ex);
228                    return;
229                }
230                msg.append(", COORD_ACTIONS : " + cactions.size());
231                for (CoordinatorActionBean caction : cactions) {
232                    try {
233                        Services.get().get(InstrumentationService.class).get()
234                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
235                        if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
236                            queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
237    
238                            log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :"
239                                    + caction.getId());
240                        }
241                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
242                            CoordinatorJobBean coordJob = jpaService
243                                    .execute(new CoordJobGetJPAExecutor(caction.getJobId()));
244                            queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
245                                    coordJob.getAuthToken(), caction.getJobId()));
246    
247                            log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
248                                    + caction.getId());
249                        }
250                        else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
251                            if (caction.getExternalId() != null) {
252                                queueCallable(new SuspendXCommand(caction.getExternalId()));
253                                log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :"
254                                        + caction.getId());
255                            }
256                        }
257                        else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
258                            if (caction.getExternalId() != null) {
259                                queueCallable(new KillXCommand(caction.getExternalId()));
260                                log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
261                            }
262                        }
263                        else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
264                            if (caction.getExternalId() != null) {
265                                queueCallable(new ResumeXCommand(caction.getExternalId()));
266                                log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
267                            }
268                        }
269                    }
270                    catch (Exception ex) {
271                        log.error("Exception, {0}", ex.getMessage(), ex);
272                    }
273                }
274    
275    
276            }
277    
278            /**
279             * Recover coordinator actions that are staying in READY too long
280             */
281            private void runCoordActionRecoveryForReady() {
282                XLog.Info.get().clear();
283                XLog log = XLog.getLog(getClass());
284    
285                try {
286                    List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
287                    msg.append(", COORD_READY_JOBS : " + jobids.size());
288                    for (String jobid : jobids) {
289                            queueCallable(new CoordActionReadyXCommand(jobid));
290    
291                        log.info("Recover READY coord actions for jobid :" + jobid);
292                    }
293                }
294                catch (Exception ex) {
295                    log.error("Exception, {0}", ex.getMessage(), ex);
296                }
297            }
298    
299            /**
300             * Recover wf actions
301             */
302            private void runWFRecovery() {
303                XLog.Info.get().clear();
304                XLog log = XLog.getLog(getClass());
305                // queue command for action recovery
306                List<WorkflowActionBean> actions = null;
307                try {
308                    actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
309                }
310                catch (JPAExecutorException ex) {
311                    log.warn("Exception while reading pending actions from storage", ex);
312                    return;
313                }
314                // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
315                // actions.size());
316                msg.append(" WF_ACTIONS " + actions.size());
317    
318                for (WorkflowActionBean action : actions) {
319                    try {
320                        Services.get().get(InstrumentationService.class).get()
321                                .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
322                        if (action.getStatus() == WorkflowActionBean.Status.PREP
323                                || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
324                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
325                        }
326                        else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
327                            Date nextRunTime = action.getPendingAge();
328                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
329                                    - System.currentTimeMillis());
330                        }
331                        else if (action.getStatus() == WorkflowActionBean.Status.DONE
332                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
333                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
334                        }
335                        else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
336                            Date nextRunTime = action.getPendingAge();
337                            queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
338                                    - System.currentTimeMillis());
339    
340                        }
341                        else if (action.getStatus() == WorkflowActionBean.Status.OK
342                                || action.getStatus() == WorkflowActionBean.Status.ERROR) {
343                            queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
344                        }
345                        else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
346                            queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
347                        }
348                    }
349                    catch (Exception ex) {
350                        log.error("Exception, {0}", ex.getMessage(), ex);
351                    }
352                }
353    
354            }
355    
356            /**
357             * Adds callables to a list. If the number of callables in the list reaches {@link
358             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
359             *
360             * @param callable the callable to queue.
361             */
362            private void queueCallable(XCallable<?> callable) {
363                if (callables == null) {
364                    callables = new ArrayList<XCallable<?>>();
365                }
366                callables.add(callable);
367                if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
368                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
369                    if (ret == false) {
370                        XLog.getLog(getClass()).warn(
371                                "Unable to queue the callables commands for RecoveryService. "
372                                        + "Most possibly command queue is full. Queue size is :"
373                                        + Services.get().get(CallableQueueService.class).queueSize());
374                    }
375                    callables = new ArrayList<XCallable<?>>();
376                }
377            }
378    
379            /**
380             * Adds callables to a list. If the number of callables in the list reaches {@link
381             * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
382             * of the callables in the list. The callables list and the delay is reset.
383             *
384             * @param callable the callable to queue.
385             * @param delay the delay for the callable.
386             */
387            private void queueCallable(XCallable<?> callable, long delay) {
388                if (delayedCallables == null) {
389                    delayedCallables = new ArrayList<XCallable<?>>();
390                }
391                this.delay = Math.max(this.delay, delay);
392                delayedCallables.add(callable);
393                if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
394                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
395                    if (ret == false) {
396                        XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
397                                + "Most possibly Callable queue is full. Queue size is :"
398                                + Services.get().get(CallableQueueService.class).queueSize());
399                    }
400                    delayedCallables = new ArrayList<XCallable<?>>();
401                    this.delay = 0;
402                }
403            }
404        }
405    
406        /**
407         * Initializes the RecoveryService.
408         *
409         * @param services services instance.
410         */
411        @Override
412        public void init(Services services) {
413            Configuration conf = services.getConf();
414            Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
415                    CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
416            services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600),
417                                                          SchedulerService.Unit.SEC);
418        }
419    
420        /**
421         * Destroy the Recovery Service.
422         */
423        @Override
424        public void destroy() {
425        }
426    
427        /**
428         * Return the public interface for the Recovery Service.
429         *
430         * @return {@link RecoveryService}.
431         */
432        @Override
433        public Class<? extends Service> getInterface() {
434            return RecoveryService.class;
435        }
436    
437        /**
438         * Merge Bundle job config and the configuration from the coord job to pass
439         * to Coord Engine
440         *
441         * @param coordElem the coordinator configuration
442         * @return Configuration merged configuration
443         * @throws CommandException thrown if failed to merge configuration
444         */
445        private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
446            XLog.Info.get().clear();
447            XLog log = XLog.getLog("RecoveryService");
448    
449            String jobConf = bundleJob.getConf();
450            // Step 1: runConf = jobConf
451            Configuration runConf = null;
452            try {
453                runConf = new XConfiguration(new StringReader(jobConf));
454            }
455            catch (IOException e1) {
456                log.warn("Configuration parse error in:" + jobConf);
457                throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
458            }
459            // Step 2: Merge local properties into runConf
460            // extract 'property' tags under 'configuration' block in the coordElem
461            // convert Element to XConfiguration
462            Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
463    
464            if (localConfigElement != null) {
465                String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
466                Configuration localConf;
467                try {
468                    localConf = new XConfiguration(new StringReader(strConfig));
469                }
470                catch (IOException e1) {
471                    log.warn("Configuration parse error in:" + strConfig);
472                    throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
473                }
474    
475                // copy configuration properties in the coordElem to the runConf
476                XConfiguration.copy(localConf, runConf);
477            }
478    
479            // Step 3: Extract value of 'app-path' in coordElem, save it as a
480            // new property called 'oozie.coord.application.path', and normalize.
481            String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
482            runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
483            // Normalize coordinator appPath here;
484            try {
485                JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
486            }
487            catch (IOException e) {
488                throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
489            }
490            return runConf;
491        }
492    }