001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.List; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.BundleActionBean; 029import org.apache.oozie.BundleJobBean; 030import org.apache.oozie.CoordinatorActionBean; 031import org.apache.oozie.CoordinatorJobBean; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.WorkflowActionBean; 034import org.apache.oozie.client.Job; 035import org.apache.oozie.client.OozieClient; 036import org.apache.oozie.command.CommandException; 037import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; 038import org.apache.oozie.command.coord.CoordActionReadyXCommand; 039import org.apache.oozie.command.coord.CoordActionStartXCommand; 040import org.apache.oozie.command.coord.CoordKillXCommand; 041import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand; 042import org.apache.oozie.command.coord.CoordResumeXCommand; 043import org.apache.oozie.command.coord.CoordSubmitXCommand; 044import org.apache.oozie.command.coord.CoordSuspendXCommand; 045import org.apache.oozie.command.wf.ActionEndXCommand; 046import org.apache.oozie.command.wf.ActionStartXCommand; 047import org.apache.oozie.command.wf.KillXCommand; 048import org.apache.oozie.command.wf.ResumeXCommand; 049import org.apache.oozie.command.wf.SignalXCommand; 050import org.apache.oozie.command.wf.SuspendXCommand; 051import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 052import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 053import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor; 054import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor; 055import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 056import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 057import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 058import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 059import org.apache.oozie.executor.jpa.JPAExecutorException; 060import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 061import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 062import org.apache.oozie.util.ELUtils; 063import org.apache.oozie.util.JobUtils; 064import org.apache.oozie.util.XCallable; 065import org.apache.oozie.util.XConfiguration; 066import org.apache.oozie.util.XLog; 067import org.apache.oozie.util.XmlUtils; 068import org.jdom.Attribute; 069import org.jdom.Element; 070 071/** 072 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then 073 * queues them for execution. 074 */ 075public class RecoveryService implements Service { 076 077 public static final String RECOVERY_SERVICE_CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService."; 078 public static final String CONF_PREFIX_WF_ACTIONS = RECOVERY_SERVICE_CONF_PREFIX + "wf.actions."; 079 public static final String CONF_PREFIX_COORD = RECOVERY_SERVICE_CONF_PREFIX + "coord."; 080 public static final String CONF_PREFIX_BUNDLE = RECOVERY_SERVICE_CONF_PREFIX + "bundle."; 081 /** 082 * Time interval, in seconds, at which the recovery service will be scheduled to run. 083 */ 084 public static final String CONF_SERVICE_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "interval"; 085 /** 086 * The number of callables to be queued in a batch. 087 */ 088 public static final String CONF_CALLABLE_BATCH_SIZE = RECOVERY_SERVICE_CONF_PREFIX + "callable.batch.size"; 089 090 /** 091 * Delay for the push missing dependencies in milliseconds. 092 */ 093 public static final String CONF_PUSH_DEPENDENCY_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "push.dependency.interval"; 094 095 /** 096 * Age of actions to queue, in seconds. 097 */ 098 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; 099 100 public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval"; 101 102 /** 103 * Age of coordinator jobs to recover, in seconds. 104 */ 105 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than"; 106 107 /** 108 * Age of Bundle jobs to recover, in seconds. 109 */ 110 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than"; 111 112 private static final String INSTRUMENTATION_GROUP = "recovery"; 113 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions"; 114 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; 115 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; 116 117 public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000; 118 119 120 121 /** 122 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the 123 * queuing of commands. 124 */ 125 static class RecoveryRunnable implements Runnable { 126 private final long olderThan; 127 private final long coordOlderThan; 128 private final long bundleOlderThan; 129 private long delay = 0; 130 private List<XCallable<?>> callables; 131 private List<XCallable<?>> delayedCallables; 132 private StringBuilder msg = null; 133 private JPAService jpaService = null; 134 135 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) { 136 this.olderThan = olderThan; 137 this.coordOlderThan = coordOlderThan; 138 this.bundleOlderThan = bundleOlderThan; 139 } 140 141 public void run() { 142 XLog.Info.get().clear(); 143 XLog log = XLog.getLog(getClass()); 144 msg = new StringBuilder(); 145 jpaService = Services.get().get(JPAService.class); 146 runWFRecovery(); 147 runCoordActionRecovery(); 148 runCoordActionRecoveryForReady(); 149 runBundleRecovery(); 150 log.debug("QUEUING [{0}] for potential recovery", msg.toString()); 151 boolean ret = false; 152 if (null != callables) { 153 ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 154 if (ret == false) { 155 log.warn("Unable to queue the callables commands for RecoveryService. " 156 + "Most possibly command queue is full. Queue size is :" 157 + Services.get().get(CallableQueueService.class).queueSize()); 158 } 159 callables = null; 160 } 161 if (null != delayedCallables) { 162 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 163 if (ret == false) { 164 log.warn("Unable to queue the delayedCallables commands for RecoveryService. " 165 + "Most possibly Callable queue is full. Queue size is :" 166 + Services.get().get(CallableQueueService.class).queueSize()); 167 } 168 delayedCallables = null; 169 this.delay = 0; 170 } 171 } 172 173 private void runBundleRecovery(){ 174 XLog.Info.get().clear(); 175 XLog log = XLog.getLog(getClass()); 176 List<BundleActionBean> bactions = null; 177 try { 178 bactions = BundleActionQueryExecutor.getInstance().getList( 179 BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, bundleOlderThan); 180 } 181 catch (JPAExecutorException ex) { 182 log.warn("Error reading bundle actions from database", ex); 183 return; 184 } 185 msg.append(", BUNDLE_ACTIONS : " + bactions.size()); 186 for (BundleActionBean baction : bactions) { 187 try { 188 Services.get().get(InstrumentationService.class).get() 189 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); 190 if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) { 191 log.error("CoordId is null for Bundle action " + baction.getBundleActionId()); 192 continue; 193 } 194 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) { 195 if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) { 196 BundleJobBean bundleJob = null; 197 if (jpaService != null) { 198 bundleJob = BundleJobQueryExecutor.getInstance().get( 199 BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId()); 200 } 201 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 202 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 203 for (Element coordElem : coordElems) { 204 Attribute name = coordElem.getAttribute("name"); 205 String coordName=name.getValue(); 206 Configuration coordConf = mergeConfig(coordElem, bundleJob); 207 try { 208 coordName = ELUtils.resolveAppName(coordName, coordConf); 209 } 210 catch (Exception e) { 211 log.error("Error evaluating coord name " + e.getMessage(), e); 212 continue; 213 } 214 if (coordName.equals(baction.getCoordName())) { 215 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); 216 queueCallable(new CoordSubmitXCommand(coordConf, 217 bundleJob.getId(), coordName)); 218 } 219 } 220 } 221 else if (baction.getStatus() == Job.Status.KILLED) { 222 queueCallable(new CoordKillXCommand(baction.getCoordId())); 223 } 224 else if (baction.getStatus() == Job.Status.SUSPENDED 225 || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 226 queueCallable(new CoordSuspendXCommand(baction.getCoordId())); 227 } 228 else if (baction.getStatus() == Job.Status.RUNNING 229 || baction.getStatus() == Job.Status.RUNNINGWITHERROR) { 230 queueCallable(new CoordResumeXCommand(baction.getCoordId())); 231 } 232 } 233 } 234 catch (Exception ex) { 235 log.error("Exception, {0}", ex.getMessage(), ex); 236 } 237 } 238 239 } 240 241 /** 242 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long 243 */ 244 private void runCoordActionRecovery() { 245 XLog.Info.get().clear(); 246 XLog log = XLog.getLog(getClass()); 247 long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL); 248 long pushMissingDepDelay = pushMissingDepInterval; 249 List<CoordinatorActionBean> cactions = null; 250 try { 251 cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan)); 252 } 253 catch (JPAExecutorException ex) { 254 log.warn("Error reading coord actions from database", ex); 255 return; 256 } 257 msg.append(", COORD_ACTIONS : " + cactions.size()); 258 for (CoordinatorActionBean caction : cactions) { 259 try { 260 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(caction.getId())) { 261 Services.get().get(InstrumentationService.class).get() 262 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1); 263 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) { 264 queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId())); 265 log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :" 266 + caction.getId()); 267 if (caction.getPushMissingDependencies() != null 268 && caction.getPushMissingDependencies().length() != 0) { 269 queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true), 270 pushMissingDepDelay); 271 pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval; 272 log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :" 273 + caction.getId()); 274 } 275 } 276 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) { 277 CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( 278 CoordJobQuery.GET_COORD_JOB_USER_APPNAME, caction.getJobId()); 279 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), 280 coordJob.getAppName(), caction.getJobId())); 281 282 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" 283 + caction.getId()); 284 } 285 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) { 286 if (caction.getExternalId() != null && caction.getPending() > 1) { 287 queueCallable(new SuspendXCommand(caction.getExternalId())); 288 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" 289 + caction.getId()); 290 } 291 } 292 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) { 293 if (caction.getExternalId() != null) { 294 queueCallable(new KillXCommand(caction.getExternalId())); 295 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId()); 296 } 297 } 298 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) { 299 if (caction.getExternalId() != null) { 300 queueCallable(new ResumeXCommand(caction.getExternalId())); 301 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId()); 302 } 303 } 304 } 305 } 306 catch (Exception ex) { 307 log.error("Exception, {0}", ex.getMessage(), ex); 308 } 309 } 310 311 312 } 313 314 /** 315 * Recover coordinator actions that are staying in READY too long 316 */ 317 private void runCoordActionRecoveryForReady() { 318 XLog.Info.get().clear(); 319 XLog log = XLog.getLog(getClass()); 320 321 try { 322 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan)); 323 jobids = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(jobids); 324 msg.append(", COORD_READY_JOBS : " + jobids.size()); 325 for (String jobid : jobids) { 326 queueCallable(new CoordActionReadyXCommand(jobid)); 327 328 log.info("Recover READY coord actions for jobid :" + jobid); 329 } 330 } 331 catch (Exception ex) { 332 log.error("Exception, {0}", ex.getMessage(), ex); 333 } 334 } 335 336 /** 337 * Recover wf actions 338 */ 339 private void runWFRecovery() { 340 XLog.Info.get().clear(); 341 XLog log = XLog.getLog(getClass()); 342 // queue command for action recovery 343 344 long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL) 345 * ONE_DAY_MILLISCONDS; 346 347 List<WorkflowActionBean> actions = null; 348 try { 349 actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS, 350 olderThan, createdTimeInterval); 351 } 352 catch (JPAExecutorException ex) { 353 log.warn("Exception while reading pending actions from storage", ex); 354 return; 355 } 356 // log.debug("QUEUING[{0}] pending wf actions for potential recovery", 357 // actions.size()); 358 msg.append(" WF_ACTIONS " + actions.size()); 359 360 for (WorkflowActionBean action : actions) { 361 try { 362 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(action.getId())) { 363 Services.get().get(InstrumentationService.class).get() 364 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1); 365 if (action.getStatus() == WorkflowActionBean.Status.PREP 366 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 367 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 368 } 369 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 370 Date nextRunTime = action.getPendingAge(); 371 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() 372 - System.currentTimeMillis()); 373 } 374 else if (action.getStatus() == WorkflowActionBean.Status.DONE 375 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 376 queueCallable(new ActionEndXCommand(action.getId(), action.getType())); 377 } 378 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 379 Date nextRunTime = action.getPendingAge(); 380 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime() 381 - System.currentTimeMillis()); 382 383 } 384 else if (action.getStatus() == WorkflowActionBean.Status.OK 385 || action.getStatus() == WorkflowActionBean.Status.ERROR) { 386 queueCallable(new SignalXCommand(action.getJobId(), action.getId())); 387 } 388 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 389 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 390 } 391 } 392 } 393 catch (Exception ex) { 394 log.error("Exception, {0}", ex.getMessage(), ex); 395 } 396 } 397 398 } 399 400 /** 401 * Adds callables to a list. If the number of callables in the list reaches {@link 402 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset. 403 * 404 * @param callable the callable to queue. 405 */ 406 private void queueCallable(XCallable<?> callable) { 407 if (callables == null) { 408 callables = new ArrayList<XCallable<?>>(); 409 } 410 callables.add(callable); 411 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 412 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 413 if (ret == false) { 414 XLog.getLog(getClass()).warn( 415 "Unable to queue the callables commands for RecoveryService. " 416 + "Most possibly command queue is full. Queue size is :" 417 + Services.get().get(CallableQueueService.class).queueSize()); 418 } 419 callables = new ArrayList<XCallable<?>>(); 420 } 421 } 422 423 /** 424 * Adds callables to a list. If the number of callables in the list reaches {@link 425 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay 426 * of the callables in the list. The callables list and the delay is reset. 427 * 428 * @param callable the callable to queue. 429 * @param delay the delay for the callable. 430 */ 431 private void queueCallable(XCallable<?> callable, long delay) { 432 if (delayedCallables == null) { 433 delayedCallables = new ArrayList<XCallable<?>>(); 434 } 435 this.delay = Math.max(this.delay, delay); 436 delayedCallables.add(callable); 437 if (delayedCallables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)){ 438 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 439 if (ret == false) { 440 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. " 441 + "Most possibly Callable queue is full. Queue size is :" 442 + Services.get().get(CallableQueueService.class).queueSize()); 443 } 444 delayedCallables = new ArrayList<XCallable<?>>(); 445 this.delay = 0; 446 } 447 } 448 } 449 450 /** 451 * Initializes the RecoveryService. 452 * 453 * @param services services instance. 454 */ 455 @Override 456 public void init(Services services) { 457 Configuration conf = services.getConf(); 458 Runnable recoveryRunnable = new RecoveryRunnable( 459 ConfigurationService.getInt(conf, CONF_WF_ACTIONS_OLDER_THAN), 460 ConfigurationService.getInt(conf, CONF_COORD_OLDER_THAN), 461 ConfigurationService.getInt(conf, CONF_BUNDLE_OLDER_THAN)); 462 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf), 463 SchedulerService.Unit.SEC); 464 } 465 466 public int getRecoveryServiceInterval(Configuration conf){ 467 return ConfigurationService.getInt(conf, CONF_SERVICE_INTERVAL); 468 } 469 470 /** 471 * Destroy the Recovery Service. 472 */ 473 @Override 474 public void destroy() { 475 } 476 477 /** 478 * Return the public interface for the Recovery Service. 479 * 480 * @return {@link RecoveryService}. 481 */ 482 @Override 483 public Class<? extends Service> getInterface() { 484 return RecoveryService.class; 485 } 486 487 /** 488 * Merge Bundle job config and the configuration from the coord job to pass 489 * to Coord Engine 490 * 491 * @param coordElem the coordinator configuration 492 * @return Configuration merged configuration 493 * @throws CommandException thrown if failed to merge configuration 494 */ 495 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException { 496 XLog.Info.get().clear(); 497 XLog log = XLog.getLog("RecoveryService"); 498 499 String jobConf = bundleJob.getConf(); 500 // Step 1: runConf = jobConf 501 Configuration runConf = null; 502 try { 503 runConf = new XConfiguration(new StringReader(jobConf)); 504 } 505 catch (IOException e1) { 506 log.warn("Configuration parse error in:" + jobConf); 507 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 508 } 509 // Step 2: Merge local properties into runConf 510 // extract 'property' tags under 'configuration' block in the coordElem 511 // convert Element to XConfiguration 512 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 513 514 if (localConfigElement != null) { 515 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 516 Configuration localConf; 517 try { 518 localConf = new XConfiguration(new StringReader(strConfig)); 519 } 520 catch (IOException e1) { 521 log.warn("Configuration parse error in:" + strConfig); 522 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 523 } 524 525 // copy configuration properties in the coordElem to the runConf 526 XConfiguration.copy(localConf, runConf); 527 } 528 529 // Step 3: Extract value of 'app-path' in coordElem, save it as a 530 // new property called 'oozie.coord.application.path', and normalize. 531 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 532 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 533 // Normalize coordinator appPath here; 534 try { 535 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 536 } 537 catch (IOException e) { 538 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 539 } 540 return runConf; 541 } 542}