This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.util.ArrayList;
023 import java.util.Date;
024 import java.util.List;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.BundleActionBean;
028 import org.apache.oozie.BundleJobBean;
029 import org.apache.oozie.CoordinatorActionBean;
030 import org.apache.oozie.CoordinatorJobBean;
031 import org.apache.oozie.ErrorCode;
032 import org.apache.oozie.WorkflowActionBean;
033 import org.apache.oozie.client.Job;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
037 import org.apache.oozie.command.coord.CoordActionReadyXCommand;
038 import org.apache.oozie.command.coord.CoordActionStartXCommand;
039 import org.apache.oozie.command.coord.CoordKillXCommand;
040 import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
041 import org.apache.oozie.command.coord.CoordResumeXCommand;
042 import org.apache.oozie.command.coord.CoordSubmitXCommand;
043 import org.apache.oozie.command.coord.CoordSuspendXCommand;
044 import org.apache.oozie.command.wf.ActionEndXCommand;
045 import org.apache.oozie.command.wf.ActionStartXCommand;
046 import org.apache.oozie.command.wf.KillXCommand;
047 import org.apache.oozie.command.wf.ResumeXCommand;
048 import org.apache.oozie.command.wf.SignalXCommand;
049 import org.apache.oozie.command.wf.SuspendXCommand;
050 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
051 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
052 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
053 import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
054 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
055 import org.apache.oozie.executor.jpa.JPAExecutorException;
056 import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
057 import org.apache.oozie.util.JobUtils;
058 import org.apache.oozie.util.XCallable;
059 import org.apache.oozie.util.XConfiguration;
060 import org.apache.oozie.util.XLog;
061 import org.apache.oozie.util.XmlUtils;
062 import org.jdom.Attribute;
063 import org.jdom.Element;
064
065 /**
066 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then
067 * queues them for execution.
068 */
069 public class RecoveryService implements Service {
070
071 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService.";
072 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions.";
073 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord.";
074 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle.";
075 /**
076 * Time interval, in seconds, at which the recovery service will be scheduled to run.
077 */
078 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval";
079 /**
080 * The number of callables to be queued in a batch.
081 */
082 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
083
084 /**
085 * Delay for the push missing dependencies in milliseconds.
086 */
087 public static final String CONF_PUSH_DEPENDENCY_INTERVAL = CONF_PREFIX + "push.dependency.interval";
088
089 /**
090 * Age of actions to queue, in seconds.
091 */
092 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than";
093 /**
094 * Age of coordinator jobs to recover, in seconds.
095 */
096 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than";
097
098 /**
099 * Age of Bundle jobs to recover, in seconds.
100 */
101 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than";
102
103 private static final String INSTRUMENTATION_GROUP = "recovery";
104 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
105 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
106 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
107
108
109 /**
110 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the
111 * queuing of commands.
112 */
113 static class RecoveryRunnable implements Runnable {
114 private final long olderThan;
115 private final long coordOlderThan;
116 private final long bundleOlderThan;
117 private long delay = 0;
118 private List<XCallable<?>> callables;
119 private List<XCallable<?>> delayedCallables;
120 private StringBuilder msg = null;
121 private JPAService jpaService = null;
122
123 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
124 this.olderThan = olderThan;
125 this.coordOlderThan = coordOlderThan;
126 this.bundleOlderThan = bundleOlderThan;
127 }
128
129 public void run() {
130 XLog.Info.get().clear();
131 XLog log = XLog.getLog(getClass());
132 msg = new StringBuilder();
133 jpaService = Services.get().get(JPAService.class);
134 runWFRecovery();
135 runCoordActionRecovery();
136 runCoordActionRecoveryForReady();
137 runBundleRecovery();
138 log.debug("QUEUING [{0}] for potential recovery", msg.toString());
139 boolean ret = false;
140 if (null != callables) {
141 ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
142 if (ret == false) {
143 log.warn("Unable to queue the callables commands for RecoveryService. "
144 + "Most possibly command queue is full. Queue size is :"
145 + Services.get().get(CallableQueueService.class).queueSize());
146 }
147 callables = null;
148 }
149 if (null != delayedCallables) {
150 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
151 if (ret == false) {
152 log.warn("Unable to queue the delayedCallables commands for RecoveryService. "
153 + "Most possibly Callable queue is full. Queue size is :"
154 + Services.get().get(CallableQueueService.class).queueSize());
155 }
156 delayedCallables = null;
157 this.delay = 0;
158 }
159 }
160
161 private void runBundleRecovery(){
162 XLog.Info.get().clear();
163 XLog log = XLog.getLog(getClass());
164 List<BundleActionBean> bactions = null;
165 try {
166 bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan));
167 }
168 catch (JPAExecutorException ex) {
169 log.warn("Error reading bundle actions from database", ex);
170 return;
171 }
172 msg.append(", BUNDLE_ACTIONS : " + bactions.size());
173 for (BundleActionBean baction : bactions) {
174 try {
175 Services.get().get(InstrumentationService.class).get()
176 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1);
177 if (baction.getCoordId() == null) {
178 log.error("CoordId is null for Bundle action " + baction.getBundleActionId());
179 continue;
180 }
181 if (baction.getStatus() == Job.Status.PREP) {
182 BundleJobBean bundleJob = null;
183
184 if (jpaService != null) {
185 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
186 }
187 if (bundleJob != null) {
188 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
189 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
190 for (Element coordElem : coordElems) {
191 Attribute name = coordElem.getAttribute("name");
192 if (name.getValue().equals(baction.getCoordName())) {
193 Configuration coordConf = mergeConfig(coordElem, bundleJob);
194 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId());
195 queueCallable(new CoordSubmitXCommand(coordConf,
196 bundleJob.getId(), name.getValue()));
197 }
198 }
199 }
200
201 }
202 else if (baction.getStatus() == Job.Status.KILLED) {
203 queueCallable(new CoordKillXCommand(baction.getCoordId()));
204 }
205 else if (baction.getStatus() == Job.Status.SUSPENDED
206 || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
207 queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
208 }
209 else if (baction.getStatus() == Job.Status.RUNNING
210 || baction.getStatus() == Job.Status.RUNNINGWITHERROR) {
211 queueCallable(new CoordResumeXCommand(baction.getCoordId()));
212 }
213 }
214 catch (Exception ex) {
215 log.error("Exception, {0}", ex.getMessage(), ex);
216 }
217 }
218
219 }
220
221 /**
222 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long
223 */
224 private void runCoordActionRecovery() {
225 XLog.Info.get().clear();
226 XLog log = XLog.getLog(getClass());
227 long pushMissingDepInterval = Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_INTERVAL, 200);
228 long pushMissingDepDelay = pushMissingDepInterval;
229 List<CoordinatorActionBean> cactions = null;
230 try {
231 cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
232 }
233 catch (JPAExecutorException ex) {
234 log.warn("Error reading coord actions from database", ex);
235 return;
236 }
237 msg.append(", COORD_ACTIONS : " + cactions.size());
238 for (CoordinatorActionBean caction : cactions) {
239 try {
240 Services.get().get(InstrumentationService.class).get()
241 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
242 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) {
243 queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
244 log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :"
245 + caction.getId());
246 if (caction.getPushMissingDependencies() != null
247 && caction.getPushMissingDependencies().length() != 0) {
248 queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
249 pushMissingDepDelay);
250 pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
251 log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :"
252 + caction.getId());
253 }
254 }
255 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) {
256 CoordinatorJobBean coordJob = jpaService
257 .execute(new CoordJobGetJPAExecutor(caction.getJobId()));
258 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
259 coordJob.getAppName(), caction.getJobId()));
260
261 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
262 + caction.getId());
263 }
264 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) {
265 if (caction.getExternalId() != null) {
266 queueCallable(new SuspendXCommand(caction.getExternalId()));
267 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :"
268 + caction.getId());
269 }
270 }
271 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) {
272 if (caction.getExternalId() != null) {
273 queueCallable(new KillXCommand(caction.getExternalId()));
274 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
275 }
276 }
277 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) {
278 if (caction.getExternalId() != null) {
279 queueCallable(new ResumeXCommand(caction.getExternalId()));
280 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
281 }
282 }
283 }
284 catch (Exception ex) {
285 log.error("Exception, {0}", ex.getMessage(), ex);
286 }
287 }
288
289
290 }
291
292 /**
293 * Recover coordinator actions that are staying in READY too long
294 */
295 private void runCoordActionRecoveryForReady() {
296 XLog.Info.get().clear();
297 XLog log = XLog.getLog(getClass());
298
299 try {
300 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan));
301 msg.append(", COORD_READY_JOBS : " + jobids.size());
302 for (String jobid : jobids) {
303 queueCallable(new CoordActionReadyXCommand(jobid));
304
305 log.info("Recover READY coord actions for jobid :" + jobid);
306 }
307 }
308 catch (Exception ex) {
309 log.error("Exception, {0}", ex.getMessage(), ex);
310 }
311 }
312
313 /**
314 * Recover wf actions
315 */
316 private void runWFRecovery() {
317 XLog.Info.get().clear();
318 XLog log = XLog.getLog(getClass());
319 // queue command for action recovery
320 List<WorkflowActionBean> actions = null;
321 try {
322 actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan));
323 }
324 catch (JPAExecutorException ex) {
325 log.warn("Exception while reading pending actions from storage", ex);
326 return;
327 }
328 // log.debug("QUEUING[{0}] pending wf actions for potential recovery",
329 // actions.size());
330 msg.append(" WF_ACTIONS " + actions.size());
331
332 for (WorkflowActionBean action : actions) {
333 try {
334 Services.get().get(InstrumentationService.class).get()
335 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1);
336 if (action.getStatus() == WorkflowActionBean.Status.PREP
337 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
338 queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
339 }
340 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) {
341 Date nextRunTime = action.getPendingAge();
342 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime()
343 - System.currentTimeMillis());
344 }
345 else if (action.getStatus() == WorkflowActionBean.Status.DONE
346 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
347 queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
348 }
349 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) {
350 Date nextRunTime = action.getPendingAge();
351 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime()
352 - System.currentTimeMillis());
353
354 }
355 else if (action.getStatus() == WorkflowActionBean.Status.OK
356 || action.getStatus() == WorkflowActionBean.Status.ERROR) {
357 queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
358 }
359 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
360 queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
361 }
362 }
363 catch (Exception ex) {
364 log.error("Exception, {0}", ex.getMessage(), ex);
365 }
366 }
367
368 }
369
370 /**
371 * Adds callables to a list. If the number of callables in the list reaches {@link
372 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset.
373 *
374 * @param callable the callable to queue.
375 */
376 private void queueCallable(XCallable<?> callable) {
377 if (callables == null) {
378 callables = new ArrayList<XCallable<?>>();
379 }
380 callables.add(callable);
381 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
382 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
383 if (ret == false) {
384 XLog.getLog(getClass()).warn(
385 "Unable to queue the callables commands for RecoveryService. "
386 + "Most possibly command queue is full. Queue size is :"
387 + Services.get().get(CallableQueueService.class).queueSize());
388 }
389 callables = new ArrayList<XCallable<?>>();
390 }
391 }
392
393 /**
394 * Adds callables to a list. If the number of callables in the list reaches {@link
395 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay
396 * of the callables in the list. The callables list and the delay is reset.
397 *
398 * @param callable the callable to queue.
399 * @param delay the delay for the callable.
400 */
401 private void queueCallable(XCallable<?> callable, long delay) {
402 if (delayedCallables == null) {
403 delayedCallables = new ArrayList<XCallable<?>>();
404 }
405 this.delay = Math.max(this.delay, delay);
406 delayedCallables.add(callable);
407 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
408 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay);
409 if (ret == false) {
410 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. "
411 + "Most possibly Callable queue is full. Queue size is :"
412 + Services.get().get(CallableQueueService.class).queueSize());
413 }
414 delayedCallables = new ArrayList<XCallable<?>>();
415 this.delay = 0;
416 }
417 }
418 }
419
420 /**
421 * Initializes the RecoveryService.
422 *
423 * @param services services instance.
424 */
425 @Override
426 public void init(Services services) {
427 Configuration conf = services.getConf();
428 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
429 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
430 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf),
431 SchedulerService.Unit.SEC);
432 }
433
434 public int getRecoveryServiceInterval(Configuration conf){
435 return conf.getInt(CONF_SERVICE_INTERVAL, 60);
436 }
437
438 /**
439 * Destroy the Recovery Service.
440 */
441 @Override
442 public void destroy() {
443 }
444
445 /**
446 * Return the public interface for the Recovery Service.
447 *
448 * @return {@link RecoveryService}.
449 */
450 @Override
451 public Class<? extends Service> getInterface() {
452 return RecoveryService.class;
453 }
454
455 /**
456 * Merge Bundle job config and the configuration from the coord job to pass
457 * to Coord Engine
458 *
459 * @param coordElem the coordinator configuration
460 * @return Configuration merged configuration
461 * @throws CommandException thrown if failed to merge configuration
462 */
463 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException {
464 XLog.Info.get().clear();
465 XLog log = XLog.getLog("RecoveryService");
466
467 String jobConf = bundleJob.getConf();
468 // Step 1: runConf = jobConf
469 Configuration runConf = null;
470 try {
471 runConf = new XConfiguration(new StringReader(jobConf));
472 }
473 catch (IOException e1) {
474 log.warn("Configuration parse error in:" + jobConf);
475 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
476 }
477 // Step 2: Merge local properties into runConf
478 // extract 'property' tags under 'configuration' block in the coordElem
479 // convert Element to XConfiguration
480 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
481
482 if (localConfigElement != null) {
483 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
484 Configuration localConf;
485 try {
486 localConf = new XConfiguration(new StringReader(strConfig));
487 }
488 catch (IOException e1) {
489 log.warn("Configuration parse error in:" + strConfig);
490 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
491 }
492
493 // copy configuration properties in the coordElem to the runConf
494 XConfiguration.copy(localConf, runConf);
495 }
496
497 // Step 3: Extract value of 'app-path' in coordElem, save it as a
498 // new property called 'oozie.coord.application.path', and normalize.
499 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
500 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
501 // Normalize coordinator appPath here;
502 try {
503 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf);
504 }
505 catch (IOException e) {
506 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH));
507 }
508 return runConf;
509 }
510 }