001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.List; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.BundleActionBean; 028 import org.apache.oozie.BundleJobBean; 029 import org.apache.oozie.CoordinatorActionBean; 030 import org.apache.oozie.CoordinatorJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.WorkflowActionBean; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.coord.CoordActionInputCheckCommand; 037 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; 038 import org.apache.oozie.command.coord.CoordActionReadyCommand; 039 import org.apache.oozie.command.coord.CoordActionReadyXCommand; 040 import org.apache.oozie.command.coord.CoordActionStartCommand; 041 import org.apache.oozie.command.coord.CoordActionStartXCommand; 042 import org.apache.oozie.command.coord.CoordKillXCommand; 043 import org.apache.oozie.command.coord.CoordResumeXCommand; 044 import org.apache.oozie.command.coord.CoordSubmitXCommand; 045 import org.apache.oozie.command.coord.CoordSuspendXCommand; 046 import org.apache.oozie.command.wf.ActionEndCommand; 047 import org.apache.oozie.command.wf.ActionEndXCommand; 048 import org.apache.oozie.command.wf.ActionStartCommand; 049 import org.apache.oozie.command.wf.ActionStartXCommand; 050 import org.apache.oozie.command.wf.KillXCommand; 051 import org.apache.oozie.command.wf.ResumeXCommand; 052 import org.apache.oozie.command.wf.SignalCommand; 053 import org.apache.oozie.command.wf.SignalXCommand; 054 import org.apache.oozie.command.wf.SuspendXCommand; 055 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor; 056 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 057 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor; 058 import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor; 059 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 060 import org.apache.oozie.executor.jpa.JPAExecutorException; 061 import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor; 062 import org.apache.oozie.util.JobUtils; 063 import org.apache.oozie.util.XCallable; 064 import org.apache.oozie.util.XConfiguration; 065 import org.apache.oozie.util.XLog; 066 import org.apache.oozie.util.XmlUtils; 067 import org.jdom.Attribute; 068 import org.jdom.Element; 069 import org.jdom.JDOMException; 070 071 /** 072 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then 073 * queues them for execution. 074 */ 075 public class RecoveryService implements Service { 076 077 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService."; 078 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions."; 079 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord."; 080 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle."; 081 /** 082 * Time interval, in seconds, at which the recovery service will be scheduled to run. 083 */ 084 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval"; 085 /** 086 * The number of callables to be queued in a batch. 087 */ 088 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 089 /** 090 * Age of actions to queue, in seconds. 091 */ 092 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; 093 /** 094 * Age of coordinator jobs to recover, in seconds. 095 */ 096 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than"; 097 098 /** 099 * Age of Bundle jobs to recover, in seconds. 100 */ 101 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than"; 102 103 private static final String INSTRUMENTATION_GROUP = "recovery"; 104 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions"; 105 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; 106 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; 107 108 private static boolean useXCommand = true; 109 110 111 /** 112 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the 113 * queuing of commands. 114 */ 115 static class RecoveryRunnable implements Runnable { 116 private final long olderThan; 117 private final long coordOlderThan; 118 private final long bundleOlderThan; 119 private long delay = 0; 120 private List<XCallable<?>> callables; 121 private List<XCallable<?>> delayedCallables; 122 private StringBuilder msg = null; 123 private JPAService jpaService = null; 124 125 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) { 126 this.olderThan = olderThan; 127 this.coordOlderThan = coordOlderThan; 128 this.bundleOlderThan = bundleOlderThan; 129 } 130 131 public void run() { 132 XLog.Info.get().clear(); 133 XLog log = XLog.getLog(getClass()); 134 msg = new StringBuilder(); 135 jpaService = Services.get().get(JPAService.class); 136 runWFRecovery(); 137 runCoordActionRecovery(); 138 runCoordActionRecoveryForReady(); 139 runBundleRecovery(); 140 log.debug("QUEUING [{0}] for potential recovery", msg.toString()); 141 boolean ret = false; 142 if (null != callables) { 143 ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 144 if (ret == false) { 145 log.warn("Unable to queue the callables commands for RecoveryService. " 146 + "Most possibly command queue is full. Queue size is :" 147 + Services.get().get(CallableQueueService.class).queueSize()); 148 } 149 callables = null; 150 } 151 if (null != delayedCallables) { 152 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 153 if (ret == false) { 154 log.warn("Unable to queue the delayedCallables commands for RecoveryService. " 155 + "Most possibly Callable queue is full. Queue size is :" 156 + Services.get().get(CallableQueueService.class).queueSize()); 157 } 158 delayedCallables = null; 159 this.delay = 0; 160 } 161 } 162 163 private void runBundleRecovery(){ 164 XLog.Info.get().clear(); 165 XLog log = XLog.getLog(getClass()); 166 167 try { 168 List<BundleActionBean> bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan)); 169 msg.append(", BUNDLE_ACTIONS : " + bactions.size()); 170 for (BundleActionBean baction : bactions) { 171 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 172 INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); 173 if(baction.getStatus() == Job.Status.PREP){ 174 BundleJobBean bundleJob = null; 175 try { 176 if (jpaService != null) { 177 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId())); 178 } 179 if(bundleJob != null){ 180 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 181 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 182 for (Element coordElem : coordElems) { 183 Attribute name = coordElem.getAttribute("name"); 184 if (name.getValue().equals(baction.getCoordName())) { 185 Configuration coordConf = mergeConfig(coordElem,bundleJob); 186 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); 187 queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue())); 188 } 189 } 190 } 191 } 192 catch (JDOMException jex) { 193 throw new CommandException(ErrorCode.E1301, jex); 194 } 195 catch (JPAExecutorException je) { 196 throw new CommandException(je); 197 } 198 } 199 else if(baction.getStatus() == Job.Status.KILLED){ 200 queueCallable(new CoordKillXCommand(baction.getCoordId())); 201 } 202 else if(baction.getStatus() == Job.Status.SUSPENDED){ 203 queueCallable(new CoordSuspendXCommand(baction.getCoordId())); 204 } 205 else if(baction.getStatus() == Job.Status.RUNNING){ 206 queueCallable(new CoordResumeXCommand(baction.getCoordId())); 207 } 208 } 209 } 210 catch (Exception ex) { 211 log.error("Exception, {0}", ex.getMessage(), ex); 212 } 213 } 214 215 /** 216 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long 217 */ 218 private void runCoordActionRecovery() { 219 XLog.Info.get().clear(); 220 XLog log = XLog.getLog(getClass()); 221 222 try { 223 List<CoordinatorActionBean> cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan)); 224 msg.append(", COORD_ACTIONS : " + cactions.size()); 225 for (CoordinatorActionBean caction : cactions) { 226 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 227 INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1); 228 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) { 229 if (useXCommand) { 230 queueCallable(new CoordActionInputCheckXCommand(caction.getId())); 231 } else { 232 queueCallable(new CoordActionInputCheckCommand(caction.getId())); 233 } 234 235 log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :" + caction.getId()); 236 } 237 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) { 238 CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId())); 239 240 if (useXCommand) { 241 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob 242 .getAuthToken())); 243 } else { 244 queueCallable(new CoordActionStartCommand(caction.getId(), coordJob.getUser(), coordJob 245 .getAuthToken())); 246 } 247 248 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" + caction.getId()); 249 } 250 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) { 251 if (caction.getExternalId() != null) { 252 queueCallable(new SuspendXCommand(caction.getExternalId())); 253 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" + caction.getId()); 254 } 255 } 256 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) { 257 if (caction.getExternalId() != null) { 258 queueCallable(new KillXCommand(caction.getExternalId())); 259 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId()); 260 } 261 } 262 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) { 263 if (caction.getExternalId() != null) { 264 queueCallable(new ResumeXCommand(caction.getExternalId())); 265 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId()); 266 } 267 } 268 } 269 } 270 catch (Exception ex) { 271 log.error("Exception, {0}", ex.getMessage(), ex); 272 } 273 } 274 275 /** 276 * Recover coordinator actions that are staying in READY too long 277 */ 278 private void runCoordActionRecoveryForReady() { 279 XLog.Info.get().clear(); 280 XLog log = XLog.getLog(getClass()); 281 282 try { 283 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan)); 284 msg.append(", COORD_READY_JOBS : " + jobids.size()); 285 for (String jobid : jobids) { 286 if (useXCommand) { 287 queueCallable(new CoordActionReadyXCommand(jobid)); 288 } else { 289 queueCallable(new CoordActionReadyCommand(jobid)); 290 } 291 292 log.info("Recover READY coord actions for jobid :" + jobid); 293 } 294 } 295 catch (Exception ex) { 296 log.error("Exception, {0}", ex.getMessage(), ex); 297 } 298 } 299 300 /** 301 * Recover wf actions 302 */ 303 private void runWFRecovery() { 304 XLog.Info.get().clear(); 305 XLog log = XLog.getLog(getClass()); 306 // queue command for action recovery 307 try { 308 List<WorkflowActionBean> actions = null; 309 try { 310 actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan)); 311 } 312 catch (JPAExecutorException ex) { 313 log.warn("Exception while reading pending actions from storage", ex); 314 } 315 //log.debug("QUEUING[{0}] pending wf actions for potential recovery", actions.size()); 316 msg.append(" WF_ACTIONS " + actions.size()); 317 318 for (WorkflowActionBean action : actions) { 319 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 320 INSTR_RECOVERED_ACTIONS_COUNTER, 1); 321 if (action.getStatus() == WorkflowActionBean.Status.PREP 322 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 323 324 if (useXCommand) { 325 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 326 } else { 327 queueCallable(new ActionStartCommand(action.getId(), action.getType())); 328 } 329 330 } 331 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 332 Date nextRunTime = action.getPendingAge(); 333 if (useXCommand) { 334 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() 335 - System.currentTimeMillis()); 336 } else { 337 queueCallable(new ActionStartCommand(action.getId(), action.getType()), nextRunTime.getTime() 338 - System.currentTimeMillis()); 339 } 340 341 } 342 else if (action.getStatus() == WorkflowActionBean.Status.DONE 343 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 344 if (useXCommand) { 345 queueCallable(new ActionEndXCommand(action.getId(), action.getType())); 346 } else { 347 queueCallable(new ActionEndCommand(action.getId(), action.getType())); 348 } 349 350 } 351 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 352 Date nextRunTime = action.getPendingAge(); 353 if (useXCommand) { 354 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime() 355 - System.currentTimeMillis()); 356 } else { 357 queueCallable(new ActionEndCommand(action.getId(), action.getType()), nextRunTime.getTime() 358 - System.currentTimeMillis()); 359 } 360 361 } 362 else if (action.getStatus() == WorkflowActionBean.Status.OK 363 || action.getStatus() == WorkflowActionBean.Status.ERROR) { 364 if (useXCommand) { 365 queueCallable(new SignalXCommand(action.getJobId(), action.getId())); 366 } else { 367 queueCallable(new SignalCommand(action.getJobId(), action.getId())); 368 } 369 370 } 371 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 372 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 373 } 374 } 375 } 376 catch (Exception ex) { 377 log.error("Exception, {0}", ex.getMessage(), ex); 378 } 379 } 380 381 /** 382 * Adds callables to a list. If the number of callables in the list reaches {@link 383 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset. 384 * 385 * @param callable the callable to queue. 386 */ 387 private void queueCallable(XCallable<?> callable) { 388 if (callables == null) { 389 callables = new ArrayList<XCallable<?>>(); 390 } 391 callables.add(callable); 392 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 393 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 394 if (ret == false) { 395 XLog.getLog(getClass()).warn( 396 "Unable to queue the callables commands for RecoveryService. " 397 + "Most possibly command queue is full. Queue size is :" 398 + Services.get().get(CallableQueueService.class).queueSize()); 399 } 400 callables = new ArrayList<XCallable<?>>(); 401 } 402 } 403 404 /** 405 * Adds callables to a list. If the number of callables in the list reaches {@link 406 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay 407 * of the callables in the list. The callables list and the delay is reset. 408 * 409 * @param callable the callable to queue. 410 * @param delay the delay for the callable. 411 */ 412 private void queueCallable(XCallable<?> callable, long delay) { 413 if (delayedCallables == null) { 414 delayedCallables = new ArrayList<XCallable<?>>(); 415 } 416 this.delay = Math.max(this.delay, delay); 417 delayedCallables.add(callable); 418 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 419 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 420 if (ret == false) { 421 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. " 422 + "Most possibly Callable queue is full. Queue size is :" 423 + Services.get().get(CallableQueueService.class).queueSize()); 424 } 425 delayedCallables = new ArrayList<XCallable<?>>(); 426 this.delay = 0; 427 } 428 } 429 } 430 431 /** 432 * Initializes the RecoveryService. 433 * 434 * @param services services instance. 435 */ 436 @Override 437 public void init(Services services) { 438 Configuration conf = services.getConf(); 439 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt( 440 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600)); 441 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600), 442 SchedulerService.Unit.SEC); 443 444 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) { 445 useXCommand = false; 446 } 447 } 448 449 /** 450 * Destroy the Recovery Service. 451 */ 452 @Override 453 public void destroy() { 454 } 455 456 /** 457 * Return the public interface for the Recovery Service. 458 * 459 * @return {@link RecoveryService}. 460 */ 461 @Override 462 public Class<? extends Service> getInterface() { 463 return RecoveryService.class; 464 } 465 466 /** 467 * Merge Bundle job config and the configuration from the coord job to pass 468 * to Coord Engine 469 * 470 * @param coordElem the coordinator configuration 471 * @return Configuration merged configuration 472 * @throws CommandException thrown if failed to merge configuration 473 */ 474 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException { 475 XLog.Info.get().clear(); 476 XLog log = XLog.getLog("RecoveryService"); 477 478 String jobConf = bundleJob.getConf(); 479 // Step 1: runConf = jobConf 480 Configuration runConf = null; 481 try { 482 runConf = new XConfiguration(new StringReader(jobConf)); 483 } 484 catch (IOException e1) { 485 log.warn("Configuration parse error in:" + jobConf); 486 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 487 } 488 // Step 2: Merge local properties into runConf 489 // extract 'property' tags under 'configuration' block in the coordElem 490 // convert Element to XConfiguration 491 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 492 493 if (localConfigElement != null) { 494 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 495 Configuration localConf; 496 try { 497 localConf = new XConfiguration(new StringReader(strConfig)); 498 } 499 catch (IOException e1) { 500 log.warn("Configuration parse error in:" + strConfig); 501 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 502 } 503 504 // copy configuration properties in the coordElem to the runConf 505 XConfiguration.copy(localConf, runConf); 506 } 507 508 // Step 3: Extract value of 'app-path' in coordElem, save it as a 509 // new property called 'oozie.coord.application.path', and normalize. 510 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 511 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 512 // Normalize coordinator appPath here; 513 try { 514 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 515 } 516 catch (IOException e) { 517 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 518 } 519 return runConf; 520 } 521 }