001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.List; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.BundleActionBean; 028 import org.apache.oozie.BundleJobBean; 029 import org.apache.oozie.CoordinatorActionBean; 030 import org.apache.oozie.CoordinatorJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.WorkflowActionBean; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; 037 import org.apache.oozie.command.coord.CoordActionReadyXCommand; 038 import org.apache.oozie.command.coord.CoordActionStartXCommand; 039 import org.apache.oozie.command.coord.CoordKillXCommand; 040 import org.apache.oozie.command.coord.CoordResumeXCommand; 041 import org.apache.oozie.command.coord.CoordSubmitXCommand; 042 import org.apache.oozie.command.coord.CoordSuspendXCommand; 043 import org.apache.oozie.command.wf.ActionEndXCommand; 044 import org.apache.oozie.command.wf.ActionStartXCommand; 045 import org.apache.oozie.command.wf.KillXCommand; 046 import org.apache.oozie.command.wf.ResumeXCommand; 047 import org.apache.oozie.command.wf.SignalXCommand; 048 import org.apache.oozie.command.wf.SuspendXCommand; 049 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor; 050 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 051 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor; 052 import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor; 053 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 054 import org.apache.oozie.executor.jpa.JPAExecutorException; 055 import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor; 056 import org.apache.oozie.util.JobUtils; 057 import org.apache.oozie.util.XCallable; 058 import org.apache.oozie.util.XConfiguration; 059 import org.apache.oozie.util.XLog; 060 import org.apache.oozie.util.XmlUtils; 061 import org.jdom.Attribute; 062 import org.jdom.Element; 063 import org.jdom.JDOMException; 064 065 /** 066 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then 067 * queues them for execution. 068 */ 069 public class RecoveryService implements Service { 070 071 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService."; 072 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions."; 073 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord."; 074 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle."; 075 /** 076 * Time interval, in seconds, at which the recovery service will be scheduled to run. 077 */ 078 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval"; 079 /** 080 * The number of callables to be queued in a batch. 081 */ 082 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 083 /** 084 * Age of actions to queue, in seconds. 085 */ 086 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; 087 /** 088 * Age of coordinator jobs to recover, in seconds. 089 */ 090 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than"; 091 092 /** 093 * Age of Bundle jobs to recover, in seconds. 094 */ 095 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than"; 096 097 private static final String INSTRUMENTATION_GROUP = "recovery"; 098 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions"; 099 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; 100 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; 101 102 103 /** 104 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the 105 * queuing of commands. 106 */ 107 static class RecoveryRunnable implements Runnable { 108 private final long olderThan; 109 private final long coordOlderThan; 110 private final long bundleOlderThan; 111 private long delay = 0; 112 private List<XCallable<?>> callables; 113 private List<XCallable<?>> delayedCallables; 114 private StringBuilder msg = null; 115 private JPAService jpaService = null; 116 117 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) { 118 this.olderThan = olderThan; 119 this.coordOlderThan = coordOlderThan; 120 this.bundleOlderThan = bundleOlderThan; 121 } 122 123 public void run() { 124 XLog.Info.get().clear(); 125 XLog log = XLog.getLog(getClass()); 126 msg = new StringBuilder(); 127 jpaService = Services.get().get(JPAService.class); 128 runWFRecovery(); 129 runCoordActionRecovery(); 130 runCoordActionRecoveryForReady(); 131 runBundleRecovery(); 132 log.debug("QUEUING [{0}] for potential recovery", msg.toString()); 133 boolean ret = false; 134 if (null != callables) { 135 ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 136 if (ret == false) { 137 log.warn("Unable to queue the callables commands for RecoveryService. " 138 + "Most possibly command queue is full. Queue size is :" 139 + Services.get().get(CallableQueueService.class).queueSize()); 140 } 141 callables = null; 142 } 143 if (null != delayedCallables) { 144 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 145 if (ret == false) { 146 log.warn("Unable to queue the delayedCallables commands for RecoveryService. " 147 + "Most possibly Callable queue is full. Queue size is :" 148 + Services.get().get(CallableQueueService.class).queueSize()); 149 } 150 delayedCallables = null; 151 this.delay = 0; 152 } 153 } 154 155 private void runBundleRecovery(){ 156 XLog.Info.get().clear(); 157 XLog log = XLog.getLog(getClass()); 158 List<BundleActionBean> bactions = null; 159 try { 160 bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan)); 161 } 162 catch (JPAExecutorException ex) { 163 log.warn("Error reading bundle actions from database", ex); 164 return; 165 } 166 msg.append(", BUNDLE_ACTIONS : " + bactions.size()); 167 for (BundleActionBean baction : bactions) { 168 try { 169 Services.get().get(InstrumentationService.class).get() 170 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); 171 if (baction.getCoordId() == null) { 172 log.error("CoordId is null for Bundle action " + baction.getBundleActionId()); 173 continue; 174 } 175 if (baction.getStatus() == Job.Status.PREP) { 176 BundleJobBean bundleJob = null; 177 178 if (jpaService != null) { 179 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId())); 180 } 181 if (bundleJob != null) { 182 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 183 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 184 for (Element coordElem : coordElems) { 185 Attribute name = coordElem.getAttribute("name"); 186 if (name.getValue().equals(baction.getCoordName())) { 187 Configuration coordConf = mergeConfig(coordElem, bundleJob); 188 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); 189 queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), 190 bundleJob.getId(), name.getValue())); 191 } 192 } 193 } 194 195 } 196 else if (baction.getStatus() == Job.Status.KILLED) { 197 queueCallable(new CoordKillXCommand(baction.getCoordId())); 198 } 199 else if (baction.getStatus() == Job.Status.SUSPENDED 200 || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 201 queueCallable(new CoordSuspendXCommand(baction.getCoordId())); 202 } 203 else if (baction.getStatus() == Job.Status.RUNNING 204 || baction.getStatus() == Job.Status.RUNNINGWITHERROR) { 205 queueCallable(new CoordResumeXCommand(baction.getCoordId())); 206 } 207 } 208 catch (Exception ex) { 209 log.error("Exception, {0}", ex.getMessage(), ex); 210 } 211 } 212 213 214 } 215 216 /** 217 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long 218 */ 219 private void runCoordActionRecovery() { 220 XLog.Info.get().clear(); 221 XLog log = XLog.getLog(getClass()); 222 List<CoordinatorActionBean> cactions = null; 223 try { 224 cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan)); 225 } 226 catch (JPAExecutorException ex) { 227 log.warn("Error reading coord actions from database", ex); 228 return; 229 } 230 msg.append(", COORD_ACTIONS : " + cactions.size()); 231 for (CoordinatorActionBean caction : cactions) { 232 try { 233 Services.get().get(InstrumentationService.class).get() 234 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1); 235 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) { 236 queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId())); 237 238 log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :" 239 + caction.getId()); 240 } 241 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) { 242 CoordinatorJobBean coordJob = jpaService 243 .execute(new CoordJobGetJPAExecutor(caction.getJobId())); 244 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), 245 coordJob.getAuthToken(), caction.getJobId())); 246 247 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" 248 + caction.getId()); 249 } 250 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) { 251 if (caction.getExternalId() != null) { 252 queueCallable(new SuspendXCommand(caction.getExternalId())); 253 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" 254 + caction.getId()); 255 } 256 } 257 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) { 258 if (caction.getExternalId() != null) { 259 queueCallable(new KillXCommand(caction.getExternalId())); 260 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId()); 261 } 262 } 263 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) { 264 if (caction.getExternalId() != null) { 265 queueCallable(new ResumeXCommand(caction.getExternalId())); 266 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId()); 267 } 268 } 269 } 270 catch (Exception ex) { 271 log.error("Exception, {0}", ex.getMessage(), ex); 272 } 273 } 274 275 276 } 277 278 /** 279 * Recover coordinator actions that are staying in READY too long 280 */ 281 private void runCoordActionRecoveryForReady() { 282 XLog.Info.get().clear(); 283 XLog log = XLog.getLog(getClass()); 284 285 try { 286 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan)); 287 msg.append(", COORD_READY_JOBS : " + jobids.size()); 288 for (String jobid : jobids) { 289 queueCallable(new CoordActionReadyXCommand(jobid)); 290 291 log.info("Recover READY coord actions for jobid :" + jobid); 292 } 293 } 294 catch (Exception ex) { 295 log.error("Exception, {0}", ex.getMessage(), ex); 296 } 297 } 298 299 /** 300 * Recover wf actions 301 */ 302 private void runWFRecovery() { 303 XLog.Info.get().clear(); 304 XLog log = XLog.getLog(getClass()); 305 // queue command for action recovery 306 List<WorkflowActionBean> actions = null; 307 try { 308 actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan)); 309 } 310 catch (JPAExecutorException ex) { 311 log.warn("Exception while reading pending actions from storage", ex); 312 return; 313 } 314 // log.debug("QUEUING[{0}] pending wf actions for potential recovery", 315 // actions.size()); 316 msg.append(" WF_ACTIONS " + actions.size()); 317 318 for (WorkflowActionBean action : actions) { 319 try { 320 Services.get().get(InstrumentationService.class).get() 321 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1); 322 if (action.getStatus() == WorkflowActionBean.Status.PREP 323 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 324 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 325 } 326 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 327 Date nextRunTime = action.getPendingAge(); 328 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() 329 - System.currentTimeMillis()); 330 } 331 else if (action.getStatus() == WorkflowActionBean.Status.DONE 332 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 333 queueCallable(new ActionEndXCommand(action.getId(), action.getType())); 334 } 335 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 336 Date nextRunTime = action.getPendingAge(); 337 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime() 338 - System.currentTimeMillis()); 339 340 } 341 else if (action.getStatus() == WorkflowActionBean.Status.OK 342 || action.getStatus() == WorkflowActionBean.Status.ERROR) { 343 queueCallable(new SignalXCommand(action.getJobId(), action.getId())); 344 } 345 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 346 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 347 } 348 } 349 catch (Exception ex) { 350 log.error("Exception, {0}", ex.getMessage(), ex); 351 } 352 } 353 354 } 355 356 /** 357 * Adds callables to a list. If the number of callables in the list reaches {@link 358 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset. 359 * 360 * @param callable the callable to queue. 361 */ 362 private void queueCallable(XCallable<?> callable) { 363 if (callables == null) { 364 callables = new ArrayList<XCallable<?>>(); 365 } 366 callables.add(callable); 367 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 368 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 369 if (ret == false) { 370 XLog.getLog(getClass()).warn( 371 "Unable to queue the callables commands for RecoveryService. " 372 + "Most possibly command queue is full. Queue size is :" 373 + Services.get().get(CallableQueueService.class).queueSize()); 374 } 375 callables = new ArrayList<XCallable<?>>(); 376 } 377 } 378 379 /** 380 * Adds callables to a list. If the number of callables in the list reaches {@link 381 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay 382 * of the callables in the list. The callables list and the delay is reset. 383 * 384 * @param callable the callable to queue. 385 * @param delay the delay for the callable. 386 */ 387 private void queueCallable(XCallable<?> callable, long delay) { 388 if (delayedCallables == null) { 389 delayedCallables = new ArrayList<XCallable<?>>(); 390 } 391 this.delay = Math.max(this.delay, delay); 392 delayedCallables.add(callable); 393 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 394 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 395 if (ret == false) { 396 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. " 397 + "Most possibly Callable queue is full. Queue size is :" 398 + Services.get().get(CallableQueueService.class).queueSize()); 399 } 400 delayedCallables = new ArrayList<XCallable<?>>(); 401 this.delay = 0; 402 } 403 } 404 } 405 406 /** 407 * Initializes the RecoveryService. 408 * 409 * @param services services instance. 410 */ 411 @Override 412 public void init(Services services) { 413 Configuration conf = services.getConf(); 414 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt( 415 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600)); 416 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600), 417 SchedulerService.Unit.SEC); 418 } 419 420 /** 421 * Destroy the Recovery Service. 422 */ 423 @Override 424 public void destroy() { 425 } 426 427 /** 428 * Return the public interface for the Recovery Service. 429 * 430 * @return {@link RecoveryService}. 431 */ 432 @Override 433 public Class<? extends Service> getInterface() { 434 return RecoveryService.class; 435 } 436 437 /** 438 * Merge Bundle job config and the configuration from the coord job to pass 439 * to Coord Engine 440 * 441 * @param coordElem the coordinator configuration 442 * @return Configuration merged configuration 443 * @throws CommandException thrown if failed to merge configuration 444 */ 445 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException { 446 XLog.Info.get().clear(); 447 XLog log = XLog.getLog("RecoveryService"); 448 449 String jobConf = bundleJob.getConf(); 450 // Step 1: runConf = jobConf 451 Configuration runConf = null; 452 try { 453 runConf = new XConfiguration(new StringReader(jobConf)); 454 } 455 catch (IOException e1) { 456 log.warn("Configuration parse error in:" + jobConf); 457 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 458 } 459 // Step 2: Merge local properties into runConf 460 // extract 'property' tags under 'configuration' block in the coordElem 461 // convert Element to XConfiguration 462 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 463 464 if (localConfigElement != null) { 465 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 466 Configuration localConf; 467 try { 468 localConf = new XConfiguration(new StringReader(strConfig)); 469 } 470 catch (IOException e1) { 471 log.warn("Configuration parse error in:" + strConfig); 472 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 473 } 474 475 // copy configuration properties in the coordElem to the runConf 476 XConfiguration.copy(localConf, runConf); 477 } 478 479 // Step 3: Extract value of 'app-path' in coordElem, save it as a 480 // new property called 'oozie.coord.application.path', and normalize. 481 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 482 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 483 // Normalize coordinator appPath here; 484 try { 485 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 486 } 487 catch (IOException e) { 488 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 489 } 490 return runConf; 491 } 492 }