001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.IOException; 021import java.io.StringReader; 022import java.util.ArrayList; 023import java.util.Date; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.oozie.BundleActionBean; 028import org.apache.oozie.BundleJobBean; 029import org.apache.oozie.CoordinatorActionBean; 030import org.apache.oozie.CoordinatorJobBean; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.WorkflowActionBean; 033import org.apache.oozie.client.Job; 034import org.apache.oozie.client.OozieClient; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; 037import org.apache.oozie.command.coord.CoordActionReadyXCommand; 038import org.apache.oozie.command.coord.CoordActionStartXCommand; 039import org.apache.oozie.command.coord.CoordKillXCommand; 040import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand; 041import org.apache.oozie.command.coord.CoordResumeXCommand; 042import org.apache.oozie.command.coord.CoordSubmitXCommand; 043import org.apache.oozie.command.coord.CoordSuspendXCommand; 044import org.apache.oozie.command.wf.ActionEndXCommand; 045import org.apache.oozie.command.wf.ActionStartXCommand; 046import org.apache.oozie.command.wf.KillXCommand; 047import org.apache.oozie.command.wf.ResumeXCommand; 048import org.apache.oozie.command.wf.SignalXCommand; 049import org.apache.oozie.command.wf.SuspendXCommand; 050import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 051import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 052import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor; 053import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor; 054import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 055import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 056import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 057import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 058import org.apache.oozie.executor.jpa.JPAExecutorException; 059import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 060import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 061import org.apache.oozie.util.JobUtils; 062import org.apache.oozie.util.XCallable; 063import org.apache.oozie.util.XConfiguration; 064import org.apache.oozie.util.XLog; 065import org.apache.oozie.util.XmlUtils; 066import org.jdom.Attribute; 067import org.jdom.Element; 068 069/** 070 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then 071 * queues them for execution. 072 */ 073public class RecoveryService implements Service { 074 075 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService."; 076 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions."; 077 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord."; 078 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle."; 079 /** 080 * Time interval, in seconds, at which the recovery service will be scheduled to run. 081 */ 082 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval"; 083 /** 084 * The number of callables to be queued in a batch. 085 */ 086 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 087 088 /** 089 * Delay for the push missing dependencies in milliseconds. 090 */ 091 public static final String CONF_PUSH_DEPENDENCY_INTERVAL = CONF_PREFIX + "push.dependency.interval"; 092 093 /** 094 * Age of actions to queue, in seconds. 095 */ 096 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; 097 /** 098 * Age of coordinator jobs to recover, in seconds. 099 */ 100 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than"; 101 102 /** 103 * Age of Bundle jobs to recover, in seconds. 104 */ 105 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than"; 106 107 private static final String INSTRUMENTATION_GROUP = "recovery"; 108 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions"; 109 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; 110 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; 111 112 113 /** 114 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the 115 * queuing of commands. 116 */ 117 static class RecoveryRunnable implements Runnable { 118 private final long olderThan; 119 private final long coordOlderThan; 120 private final long bundleOlderThan; 121 private long delay = 0; 122 private List<XCallable<?>> callables; 123 private List<XCallable<?>> delayedCallables; 124 private StringBuilder msg = null; 125 private JPAService jpaService = null; 126 127 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) { 128 this.olderThan = olderThan; 129 this.coordOlderThan = coordOlderThan; 130 this.bundleOlderThan = bundleOlderThan; 131 } 132 133 public void run() { 134 XLog.Info.get().clear(); 135 XLog log = XLog.getLog(getClass()); 136 msg = new StringBuilder(); 137 jpaService = Services.get().get(JPAService.class); 138 runWFRecovery(); 139 runCoordActionRecovery(); 140 runCoordActionRecoveryForReady(); 141 runBundleRecovery(); 142 log.debug("QUEUING [{0}] for potential recovery", msg.toString()); 143 boolean ret = false; 144 if (null != callables) { 145 ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 146 if (ret == false) { 147 log.warn("Unable to queue the callables commands for RecoveryService. " 148 + "Most possibly command queue is full. Queue size is :" 149 + Services.get().get(CallableQueueService.class).queueSize()); 150 } 151 callables = null; 152 } 153 if (null != delayedCallables) { 154 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 155 if (ret == false) { 156 log.warn("Unable to queue the delayedCallables commands for RecoveryService. " 157 + "Most possibly Callable queue is full. Queue size is :" 158 + Services.get().get(CallableQueueService.class).queueSize()); 159 } 160 delayedCallables = null; 161 this.delay = 0; 162 } 163 } 164 165 private void runBundleRecovery(){ 166 XLog.Info.get().clear(); 167 XLog log = XLog.getLog(getClass()); 168 List<BundleActionBean> bactions = null; 169 try { 170 bactions = BundleActionQueryExecutor.getInstance().getList( 171 BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, bundleOlderThan); 172 } 173 catch (JPAExecutorException ex) { 174 log.warn("Error reading bundle actions from database", ex); 175 return; 176 } 177 msg.append(", BUNDLE_ACTIONS : " + bactions.size()); 178 for (BundleActionBean baction : bactions) { 179 try { 180 Services.get().get(InstrumentationService.class).get() 181 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); 182 if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) { 183 log.error("CoordId is null for Bundle action " + baction.getBundleActionId()); 184 continue; 185 } 186 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) { 187 if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) { 188 BundleJobBean bundleJob = null; 189 if (jpaService != null) { 190 bundleJob = BundleJobQueryExecutor.getInstance().get( 191 BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId()); 192 } 193 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 194 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 195 for (Element coordElem : coordElems) { 196 Attribute name = coordElem.getAttribute("name"); 197 if (name.getValue().equals(baction.getCoordName())) { 198 Configuration coordConf = mergeConfig(coordElem, bundleJob); 199 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); 200 queueCallable(new CoordSubmitXCommand(coordConf, 201 bundleJob.getId(), name.getValue())); 202 } 203 } 204 } 205 else if (baction.getStatus() == Job.Status.KILLED) { 206 queueCallable(new CoordKillXCommand(baction.getCoordId())); 207 } 208 else if (baction.getStatus() == Job.Status.SUSPENDED 209 || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 210 queueCallable(new CoordSuspendXCommand(baction.getCoordId())); 211 } 212 else if (baction.getStatus() == Job.Status.RUNNING 213 || baction.getStatus() == Job.Status.RUNNINGWITHERROR) { 214 queueCallable(new CoordResumeXCommand(baction.getCoordId())); 215 } 216 } 217 } 218 catch (Exception ex) { 219 log.error("Exception, {0}", ex.getMessage(), ex); 220 } 221 } 222 223 } 224 225 /** 226 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long 227 */ 228 private void runCoordActionRecovery() { 229 XLog.Info.get().clear(); 230 XLog log = XLog.getLog(getClass()); 231 long pushMissingDepInterval = Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_INTERVAL, 200); 232 long pushMissingDepDelay = pushMissingDepInterval; 233 List<CoordinatorActionBean> cactions = null; 234 try { 235 cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan)); 236 } 237 catch (JPAExecutorException ex) { 238 log.warn("Error reading coord actions from database", ex); 239 return; 240 } 241 msg.append(", COORD_ACTIONS : " + cactions.size()); 242 for (CoordinatorActionBean caction : cactions) { 243 try { 244 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(caction.getId())) { 245 Services.get().get(InstrumentationService.class).get() 246 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1); 247 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) { 248 queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId())); 249 log.info("Recover a WAITING coord action and resubmit CoordActionInputCheckXCommand :" 250 + caction.getId()); 251 if (caction.getPushMissingDependencies() != null 252 && caction.getPushMissingDependencies().length() != 0) { 253 queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true), 254 pushMissingDepDelay); 255 pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval; 256 log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :" 257 + caction.getId()); 258 } 259 } 260 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) { 261 CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( 262 CoordJobQuery.GET_COORD_JOB_USER_APPNAME, caction.getJobId()); 263 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), 264 coordJob.getAppName(), caction.getJobId())); 265 266 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" 267 + caction.getId()); 268 } 269 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) { 270 if (caction.getExternalId() != null && caction.getPending() > 1) { 271 queueCallable(new SuspendXCommand(caction.getExternalId())); 272 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" 273 + caction.getId()); 274 } 275 } 276 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) { 277 if (caction.getExternalId() != null) { 278 queueCallable(new KillXCommand(caction.getExternalId())); 279 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId()); 280 } 281 } 282 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) { 283 if (caction.getExternalId() != null) { 284 queueCallable(new ResumeXCommand(caction.getExternalId())); 285 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId()); 286 } 287 } 288 } 289 } 290 catch (Exception ex) { 291 log.error("Exception, {0}", ex.getMessage(), ex); 292 } 293 } 294 295 296 } 297 298 /** 299 * Recover coordinator actions that are staying in READY too long 300 */ 301 private void runCoordActionRecoveryForReady() { 302 XLog.Info.get().clear(); 303 XLog log = XLog.getLog(getClass()); 304 305 try { 306 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan)); 307 jobids = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(jobids); 308 msg.append(", COORD_READY_JOBS : " + jobids.size()); 309 for (String jobid : jobids) { 310 queueCallable(new CoordActionReadyXCommand(jobid)); 311 312 log.info("Recover READY coord actions for jobid :" + jobid); 313 } 314 } 315 catch (Exception ex) { 316 log.error("Exception, {0}", ex.getMessage(), ex); 317 } 318 } 319 320 /** 321 * Recover wf actions 322 */ 323 private void runWFRecovery() { 324 XLog.Info.get().clear(); 325 XLog log = XLog.getLog(getClass()); 326 // queue command for action recovery 327 List<WorkflowActionBean> actions = null; 328 try { 329 actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS, 330 olderThan); 331 } 332 catch (JPAExecutorException ex) { 333 log.warn("Exception while reading pending actions from storage", ex); 334 return; 335 } 336 // log.debug("QUEUING[{0}] pending wf actions for potential recovery", 337 // actions.size()); 338 msg.append(" WF_ACTIONS " + actions.size()); 339 340 for (WorkflowActionBean action : actions) { 341 try { 342 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(action.getId())) { 343 Services.get().get(InstrumentationService.class).get() 344 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1); 345 if (action.getStatus() == WorkflowActionBean.Status.PREP 346 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 347 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 348 } 349 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 350 Date nextRunTime = action.getPendingAge(); 351 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() 352 - System.currentTimeMillis()); 353 } 354 else if (action.getStatus() == WorkflowActionBean.Status.DONE 355 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 356 queueCallable(new ActionEndXCommand(action.getId(), action.getType())); 357 } 358 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 359 Date nextRunTime = action.getPendingAge(); 360 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime() 361 - System.currentTimeMillis()); 362 363 } 364 else if (action.getStatus() == WorkflowActionBean.Status.OK 365 || action.getStatus() == WorkflowActionBean.Status.ERROR) { 366 queueCallable(new SignalXCommand(action.getJobId(), action.getId())); 367 } 368 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 369 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 370 } 371 } 372 } 373 catch (Exception ex) { 374 log.error("Exception, {0}", ex.getMessage(), ex); 375 } 376 } 377 378 } 379 380 /** 381 * Adds callables to a list. If the number of callables in the list reaches {@link 382 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset. 383 * 384 * @param callable the callable to queue. 385 */ 386 private void queueCallable(XCallable<?> callable) { 387 if (callables == null) { 388 callables = new ArrayList<XCallable<?>>(); 389 } 390 callables.add(callable); 391 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 392 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 393 if (ret == false) { 394 XLog.getLog(getClass()).warn( 395 "Unable to queue the callables commands for RecoveryService. " 396 + "Most possibly command queue is full. Queue size is :" 397 + Services.get().get(CallableQueueService.class).queueSize()); 398 } 399 callables = new ArrayList<XCallable<?>>(); 400 } 401 } 402 403 /** 404 * Adds callables to a list. If the number of callables in the list reaches {@link 405 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay 406 * of the callables in the list. The callables list and the delay is reset. 407 * 408 * @param callable the callable to queue. 409 * @param delay the delay for the callable. 410 */ 411 private void queueCallable(XCallable<?> callable, long delay) { 412 if (delayedCallables == null) { 413 delayedCallables = new ArrayList<XCallable<?>>(); 414 } 415 this.delay = Math.max(this.delay, delay); 416 delayedCallables.add(callable); 417 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 418 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 419 if (ret == false) { 420 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. " 421 + "Most possibly Callable queue is full. Queue size is :" 422 + Services.get().get(CallableQueueService.class).queueSize()); 423 } 424 delayedCallables = new ArrayList<XCallable<?>>(); 425 this.delay = 0; 426 } 427 } 428 } 429 430 /** 431 * Initializes the RecoveryService. 432 * 433 * @param services services instance. 434 */ 435 @Override 436 public void init(Services services) { 437 Configuration conf = services.getConf(); 438 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt( 439 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600)); 440 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf), 441 SchedulerService.Unit.SEC); 442 } 443 444 public int getRecoveryServiceInterval(Configuration conf){ 445 return conf.getInt(CONF_SERVICE_INTERVAL, 60); 446 } 447 448 /** 449 * Destroy the Recovery Service. 450 */ 451 @Override 452 public void destroy() { 453 } 454 455 /** 456 * Return the public interface for the Recovery Service. 457 * 458 * @return {@link RecoveryService}. 459 */ 460 @Override 461 public Class<? extends Service> getInterface() { 462 return RecoveryService.class; 463 } 464 465 /** 466 * Merge Bundle job config and the configuration from the coord job to pass 467 * to Coord Engine 468 * 469 * @param coordElem the coordinator configuration 470 * @return Configuration merged configuration 471 * @throws CommandException thrown if failed to merge configuration 472 */ 473 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException { 474 XLog.Info.get().clear(); 475 XLog log = XLog.getLog("RecoveryService"); 476 477 String jobConf = bundleJob.getConf(); 478 // Step 1: runConf = jobConf 479 Configuration runConf = null; 480 try { 481 runConf = new XConfiguration(new StringReader(jobConf)); 482 } 483 catch (IOException e1) { 484 log.warn("Configuration parse error in:" + jobConf); 485 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 486 } 487 // Step 2: Merge local properties into runConf 488 // extract 'property' tags under 'configuration' block in the coordElem 489 // convert Element to XConfiguration 490 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 491 492 if (localConfigElement != null) { 493 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 494 Configuration localConf; 495 try { 496 localConf = new XConfiguration(new StringReader(strConfig)); 497 } 498 catch (IOException e1) { 499 log.warn("Configuration parse error in:" + strConfig); 500 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 501 } 502 503 // copy configuration properties in the coordElem to the runConf 504 XConfiguration.copy(localConf, runConf); 505 } 506 507 // Step 3: Extract value of 'app-path' in coordElem, save it as a 508 // new property called 'oozie.coord.application.path', and normalize. 509 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 510 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 511 // Normalize coordinator appPath here; 512 try { 513 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 514 } 515 catch (IOException e) { 516 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 517 } 518 return runConf; 519 } 520}