001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.List; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.BundleActionBean; 028 import org.apache.oozie.BundleJobBean; 029 import org.apache.oozie.CoordinatorActionBean; 030 import org.apache.oozie.CoordinatorJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.WorkflowActionBean; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; 037 import org.apache.oozie.command.coord.CoordActionReadyXCommand; 038 import org.apache.oozie.command.coord.CoordActionStartXCommand; 039 import org.apache.oozie.command.coord.CoordKillXCommand; 040 import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand; 041 import org.apache.oozie.command.coord.CoordResumeXCommand; 042 import org.apache.oozie.command.coord.CoordSubmitXCommand; 043 import org.apache.oozie.command.coord.CoordSuspendXCommand; 044 import org.apache.oozie.command.wf.ActionEndXCommand; 045 import org.apache.oozie.command.wf.ActionStartXCommand; 046 import org.apache.oozie.command.wf.KillXCommand; 047 import org.apache.oozie.command.wf.ResumeXCommand; 048 import org.apache.oozie.command.wf.SignalXCommand; 049 import org.apache.oozie.command.wf.SuspendXCommand; 050 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor; 051 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 052 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor; 053 import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor; 054 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 055 import org.apache.oozie.executor.jpa.JPAExecutorException; 056 import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor; 057 import org.apache.oozie.util.JobUtils; 058 import org.apache.oozie.util.XCallable; 059 import org.apache.oozie.util.XConfiguration; 060 import org.apache.oozie.util.XLog; 061 import org.apache.oozie.util.XmlUtils; 062 import org.jdom.Attribute; 063 import org.jdom.Element; 064 065 /** 066 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then 067 * queues them for execution. 068 */ 069 public class RecoveryService implements Service { 070 071 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService."; 072 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions."; 073 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord."; 074 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle."; 075 /** 076 * Time interval, in seconds, at which the recovery service will be scheduled to run. 077 */ 078 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval"; 079 /** 080 * The number of callables to be queued in a batch. 081 */ 082 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 083 084 /** 085 * Delay for the push missing dependencies in milliseconds. 086 */ 087 public static final String CONF_PUSH_DEPENDENCY_INTERVAL = CONF_PREFIX + "push.dependency.interval"; 088 089 /** 090 * Age of actions to queue, in seconds. 091 */ 092 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; 093 /** 094 * Age of coordinator jobs to recover, in seconds. 095 */ 096 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than"; 097 098 /** 099 * Age of Bundle jobs to recover, in seconds. 100 */ 101 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than"; 102 103 private static final String INSTRUMENTATION_GROUP = "recovery"; 104 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions"; 105 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; 106 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; 107 108 109 /** 110 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the 111 * queuing of commands. 112 */ 113 static class RecoveryRunnable implements Runnable { 114 private final long olderThan; 115 private final long coordOlderThan; 116 private final long bundleOlderThan; 117 private long delay = 0; 118 private List<XCallable<?>> callables; 119 private List<XCallable<?>> delayedCallables; 120 private StringBuilder msg = null; 121 private JPAService jpaService = null; 122 123 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) { 124 this.olderThan = olderThan; 125 this.coordOlderThan = coordOlderThan; 126 this.bundleOlderThan = bundleOlderThan; 127 } 128 129 public void run() { 130 XLog.Info.get().clear(); 131 XLog log = XLog.getLog(getClass()); 132 msg = new StringBuilder(); 133 jpaService = Services.get().get(JPAService.class); 134 runWFRecovery(); 135 runCoordActionRecovery(); 136 runCoordActionRecoveryForReady(); 137 runBundleRecovery(); 138 log.debug("QUEUING [{0}] for potential recovery", msg.toString()); 139 boolean ret = false; 140 if (null != callables) { 141 ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 142 if (ret == false) { 143 log.warn("Unable to queue the callables commands for RecoveryService. " 144 + "Most possibly command queue is full. Queue size is :" 145 + Services.get().get(CallableQueueService.class).queueSize()); 146 } 147 callables = null; 148 } 149 if (null != delayedCallables) { 150 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 151 if (ret == false) { 152 log.warn("Unable to queue the delayedCallables commands for RecoveryService. " 153 + "Most possibly Callable queue is full. Queue size is :" 154 + Services.get().get(CallableQueueService.class).queueSize()); 155 } 156 delayedCallables = null; 157 this.delay = 0; 158 } 159 } 160 161 private void runBundleRecovery(){ 162 XLog.Info.get().clear(); 163 XLog log = XLog.getLog(getClass()); 164 List<BundleActionBean> bactions = null; 165 try { 166 bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan)); 167 } 168 catch (JPAExecutorException ex) { 169 log.warn("Error reading bundle actions from database", ex); 170 return; 171 } 172 msg.append(", BUNDLE_ACTIONS : " + bactions.size()); 173 for (BundleActionBean baction : bactions) { 174 try { 175 Services.get().get(InstrumentationService.class).get() 176 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); 177 if (baction.getCoordId() == null) { 178 log.error("CoordId is null for Bundle action " + baction.getBundleActionId()); 179 continue; 180 } 181 if (baction.getStatus() == Job.Status.PREP) { 182 BundleJobBean bundleJob = null; 183 184 if (jpaService != null) { 185 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId())); 186 } 187 if (bundleJob != null) { 188 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 189 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 190 for (Element coordElem : coordElems) { 191 Attribute name = coordElem.getAttribute("name"); 192 if (name.getValue().equals(baction.getCoordName())) { 193 Configuration coordConf = mergeConfig(coordElem, bundleJob); 194 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); 195 queueCallable(new CoordSubmitXCommand(coordConf, 196 bundleJob.getId(), name.getValue())); 197 } 198 } 199 } 200 201 } 202 else if (baction.getStatus() == Job.Status.KILLED) { 203 queueCallable(new CoordKillXCommand(baction.getCoordId())); 204 } 205 else if (baction.getStatus() == Job.Status.SUSPENDED 206 || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 207 queueCallable(new CoordSuspendXCommand(baction.getCoordId())); 208 } 209 else if (baction.getStatus() == Job.Status.RUNNING 210 || baction.getStatus() == Job.Status.RUNNINGWITHERROR) { 211 queueCallable(new CoordResumeXCommand(baction.getCoordId())); 212 } 213 } 214 catch (Exception ex) { 215 log.error("Exception, {0}", ex.getMessage(), ex); 216 } 217 } 218 219 } 220 221 /** 222 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long 223 */ 224 private void runCoordActionRecovery() { 225 XLog.Info.get().clear(); 226 XLog log = XLog.getLog(getClass()); 227 long pushMissingDepInterval = Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_INTERVAL, 200); 228 long pushMissingDepDelay = pushMissingDepInterval; 229 List<CoordinatorActionBean> cactions = null; 230 try { 231 cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan)); 232 } 233 catch (JPAExecutorException ex) { 234 log.warn("Error reading coord actions from database", ex); 235 return; 236 } 237 msg.append(", COORD_ACTIONS : " + cactions.size()); 238 for (CoordinatorActionBean caction : cactions) { 239 try { 240 Services.get().get(InstrumentationService.class).get() 241 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1); 242 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) { 243 queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId())); 244 log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :" 245 + caction.getId()); 246 if (caction.getPushMissingDependencies() != null 247 && caction.getPushMissingDependencies().length() != 0) { 248 queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true), 249 pushMissingDepDelay); 250 pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval; 251 log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :" 252 + caction.getId()); 253 } 254 } 255 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) { 256 CoordinatorJobBean coordJob = jpaService 257 .execute(new CoordJobGetJPAExecutor(caction.getJobId())); 258 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), 259 coordJob.getAppName(), caction.getJobId())); 260 261 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" 262 + caction.getId()); 263 } 264 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) { 265 if (caction.getExternalId() != null) { 266 queueCallable(new SuspendXCommand(caction.getExternalId())); 267 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" 268 + caction.getId()); 269 } 270 } 271 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) { 272 if (caction.getExternalId() != null) { 273 queueCallable(new KillXCommand(caction.getExternalId())); 274 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId()); 275 } 276 } 277 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) { 278 if (caction.getExternalId() != null) { 279 queueCallable(new ResumeXCommand(caction.getExternalId())); 280 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId()); 281 } 282 } 283 } 284 catch (Exception ex) { 285 log.error("Exception, {0}", ex.getMessage(), ex); 286 } 287 } 288 289 290 } 291 292 /** 293 * Recover coordinator actions that are staying in READY too long 294 */ 295 private void runCoordActionRecoveryForReady() { 296 XLog.Info.get().clear(); 297 XLog log = XLog.getLog(getClass()); 298 299 try { 300 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan)); 301 msg.append(", COORD_READY_JOBS : " + jobids.size()); 302 for (String jobid : jobids) { 303 queueCallable(new CoordActionReadyXCommand(jobid)); 304 305 log.info("Recover READY coord actions for jobid :" + jobid); 306 } 307 } 308 catch (Exception ex) { 309 log.error("Exception, {0}", ex.getMessage(), ex); 310 } 311 } 312 313 /** 314 * Recover wf actions 315 */ 316 private void runWFRecovery() { 317 XLog.Info.get().clear(); 318 XLog log = XLog.getLog(getClass()); 319 // queue command for action recovery 320 List<WorkflowActionBean> actions = null; 321 try { 322 actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan)); 323 } 324 catch (JPAExecutorException ex) { 325 log.warn("Exception while reading pending actions from storage", ex); 326 return; 327 } 328 // log.debug("QUEUING[{0}] pending wf actions for potential recovery", 329 // actions.size()); 330 msg.append(" WF_ACTIONS " + actions.size()); 331 332 for (WorkflowActionBean action : actions) { 333 try { 334 Services.get().get(InstrumentationService.class).get() 335 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1); 336 if (action.getStatus() == WorkflowActionBean.Status.PREP 337 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 338 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 339 } 340 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 341 Date nextRunTime = action.getPendingAge(); 342 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() 343 - System.currentTimeMillis()); 344 } 345 else if (action.getStatus() == WorkflowActionBean.Status.DONE 346 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 347 queueCallable(new ActionEndXCommand(action.getId(), action.getType())); 348 } 349 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 350 Date nextRunTime = action.getPendingAge(); 351 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime() 352 - System.currentTimeMillis()); 353 354 } 355 else if (action.getStatus() == WorkflowActionBean.Status.OK 356 || action.getStatus() == WorkflowActionBean.Status.ERROR) { 357 queueCallable(new SignalXCommand(action.getJobId(), action.getId())); 358 } 359 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 360 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 361 } 362 } 363 catch (Exception ex) { 364 log.error("Exception, {0}", ex.getMessage(), ex); 365 } 366 } 367 368 } 369 370 /** 371 * Adds callables to a list. If the number of callables in the list reaches {@link 372 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset. 373 * 374 * @param callable the callable to queue. 375 */ 376 private void queueCallable(XCallable<?> callable) { 377 if (callables == null) { 378 callables = new ArrayList<XCallable<?>>(); 379 } 380 callables.add(callable); 381 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 382 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 383 if (ret == false) { 384 XLog.getLog(getClass()).warn( 385 "Unable to queue the callables commands for RecoveryService. " 386 + "Most possibly command queue is full. Queue size is :" 387 + Services.get().get(CallableQueueService.class).queueSize()); 388 } 389 callables = new ArrayList<XCallable<?>>(); 390 } 391 } 392 393 /** 394 * Adds callables to a list. If the number of callables in the list reaches {@link 395 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay 396 * of the callables in the list. The callables list and the delay is reset. 397 * 398 * @param callable the callable to queue. 399 * @param delay the delay for the callable. 400 */ 401 private void queueCallable(XCallable<?> callable, long delay) { 402 if (delayedCallables == null) { 403 delayedCallables = new ArrayList<XCallable<?>>(); 404 } 405 this.delay = Math.max(this.delay, delay); 406 delayedCallables.add(callable); 407 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 408 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 409 if (ret == false) { 410 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. " 411 + "Most possibly Callable queue is full. Queue size is :" 412 + Services.get().get(CallableQueueService.class).queueSize()); 413 } 414 delayedCallables = new ArrayList<XCallable<?>>(); 415 this.delay = 0; 416 } 417 } 418 } 419 420 /** 421 * Initializes the RecoveryService. 422 * 423 * @param services services instance. 424 */ 425 @Override 426 public void init(Services services) { 427 Configuration conf = services.getConf(); 428 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt( 429 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600)); 430 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf), 431 SchedulerService.Unit.SEC); 432 } 433 434 public int getRecoveryServiceInterval(Configuration conf){ 435 return conf.getInt(CONF_SERVICE_INTERVAL, 60); 436 } 437 438 /** 439 * Destroy the Recovery Service. 440 */ 441 @Override 442 public void destroy() { 443 } 444 445 /** 446 * Return the public interface for the Recovery Service. 447 * 448 * @return {@link RecoveryService}. 449 */ 450 @Override 451 public Class<? extends Service> getInterface() { 452 return RecoveryService.class; 453 } 454 455 /** 456 * Merge Bundle job config and the configuration from the coord job to pass 457 * to Coord Engine 458 * 459 * @param coordElem the coordinator configuration 460 * @return Configuration merged configuration 461 * @throws CommandException thrown if failed to merge configuration 462 */ 463 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException { 464 XLog.Info.get().clear(); 465 XLog log = XLog.getLog("RecoveryService"); 466 467 String jobConf = bundleJob.getConf(); 468 // Step 1: runConf = jobConf 469 Configuration runConf = null; 470 try { 471 runConf = new XConfiguration(new StringReader(jobConf)); 472 } 473 catch (IOException e1) { 474 log.warn("Configuration parse error in:" + jobConf); 475 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 476 } 477 // Step 2: Merge local properties into runConf 478 // extract 'property' tags under 'configuration' block in the coordElem 479 // convert Element to XConfiguration 480 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 481 482 if (localConfigElement != null) { 483 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 484 Configuration localConf; 485 try { 486 localConf = new XConfiguration(new StringReader(strConfig)); 487 } 488 catch (IOException e1) { 489 log.warn("Configuration parse error in:" + strConfig); 490 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 491 } 492 493 // copy configuration properties in the coordElem to the runConf 494 XConfiguration.copy(localConf, runConf); 495 } 496 497 // Step 3: Extract value of 'app-path' in coordElem, save it as a 498 // new property called 'oozie.coord.application.path', and normalize. 499 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 500 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 501 // Normalize coordinator appPath here; 502 try { 503 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 504 } 505 catch (IOException e) { 506 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 507 } 508 return runConf; 509 } 510 }