001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.List; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.BundleActionBean; 028 import org.apache.oozie.BundleJobBean; 029 import org.apache.oozie.CoordinatorActionBean; 030 import org.apache.oozie.CoordinatorJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.WorkflowActionBean; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.command.CommandException; 036 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; 037 import org.apache.oozie.command.coord.CoordActionReadyXCommand; 038 import org.apache.oozie.command.coord.CoordActionStartXCommand; 039 import org.apache.oozie.command.coord.CoordKillXCommand; 040 import org.apache.oozie.command.coord.CoordResumeXCommand; 041 import org.apache.oozie.command.coord.CoordSubmitXCommand; 042 import org.apache.oozie.command.coord.CoordSuspendXCommand; 043 import org.apache.oozie.command.wf.ActionEndXCommand; 044 import org.apache.oozie.command.wf.ActionStartXCommand; 045 import org.apache.oozie.command.wf.KillXCommand; 046 import org.apache.oozie.command.wf.ResumeXCommand; 047 import org.apache.oozie.command.wf.SignalXCommand; 048 import org.apache.oozie.command.wf.SuspendXCommand; 049 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor; 050 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 051 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor; 052 import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor; 053 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 054 import org.apache.oozie.executor.jpa.JPAExecutorException; 055 import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor; 056 import org.apache.oozie.util.JobUtils; 057 import org.apache.oozie.util.XCallable; 058 import org.apache.oozie.util.XConfiguration; 059 import org.apache.oozie.util.XLog; 060 import org.apache.oozie.util.XmlUtils; 061 import org.jdom.Attribute; 062 import org.jdom.Element; 063 import org.jdom.JDOMException; 064 065 /** 066 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then 067 * queues them for execution. 068 */ 069 public class RecoveryService implements Service { 070 071 public static final String CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService."; 072 public static final String CONF_PREFIX_WF_ACTIONS = Service.CONF_PREFIX + "wf.actions."; 073 public static final String CONF_PREFIX_COORD = Service.CONF_PREFIX + "coord."; 074 public static final String CONF_PREFIX_BUNDLE = Service.CONF_PREFIX + "bundle."; 075 /** 076 * Time interval, in seconds, at which the recovery service will be scheduled to run. 077 */ 078 public static final String CONF_SERVICE_INTERVAL = CONF_PREFIX + "interval"; 079 /** 080 * The number of callables to be queued in a batch. 081 */ 082 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; 083 /** 084 * Age of actions to queue, in seconds. 085 */ 086 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; 087 /** 088 * Age of coordinator jobs to recover, in seconds. 089 */ 090 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than"; 091 092 /** 093 * Age of Bundle jobs to recover, in seconds. 094 */ 095 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than"; 096 097 private static final String INSTRUMENTATION_GROUP = "recovery"; 098 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions"; 099 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; 100 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; 101 102 103 /** 104 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the 105 * queuing of commands. 106 */ 107 static class RecoveryRunnable implements Runnable { 108 private final long olderThan; 109 private final long coordOlderThan; 110 private final long bundleOlderThan; 111 private long delay = 0; 112 private List<XCallable<?>> callables; 113 private List<XCallable<?>> delayedCallables; 114 private StringBuilder msg = null; 115 private JPAService jpaService = null; 116 117 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) { 118 this.olderThan = olderThan; 119 this.coordOlderThan = coordOlderThan; 120 this.bundleOlderThan = bundleOlderThan; 121 } 122 123 public void run() { 124 XLog.Info.get().clear(); 125 XLog log = XLog.getLog(getClass()); 126 msg = new StringBuilder(); 127 jpaService = Services.get().get(JPAService.class); 128 runWFRecovery(); 129 runCoordActionRecovery(); 130 runCoordActionRecoveryForReady(); 131 runBundleRecovery(); 132 log.debug("QUEUING [{0}] for potential recovery", msg.toString()); 133 boolean ret = false; 134 if (null != callables) { 135 ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 136 if (ret == false) { 137 log.warn("Unable to queue the callables commands for RecoveryService. " 138 + "Most possibly command queue is full. Queue size is :" 139 + Services.get().get(CallableQueueService.class).queueSize()); 140 } 141 callables = null; 142 } 143 if (null != delayedCallables) { 144 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 145 if (ret == false) { 146 log.warn("Unable to queue the delayedCallables commands for RecoveryService. " 147 + "Most possibly Callable queue is full. Queue size is :" 148 + Services.get().get(CallableQueueService.class).queueSize()); 149 } 150 delayedCallables = null; 151 this.delay = 0; 152 } 153 } 154 155 private void runBundleRecovery(){ 156 XLog.Info.get().clear(); 157 XLog log = XLog.getLog(getClass()); 158 159 try { 160 List<BundleActionBean> bactions = jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(bundleOlderThan)); 161 msg.append(", BUNDLE_ACTIONS : " + bactions.size()); 162 for (BundleActionBean baction : bactions) { 163 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 164 INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); 165 if(baction.getStatus() == Job.Status.PREP){ 166 BundleJobBean bundleJob = null; 167 try { 168 if (jpaService != null) { 169 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId())); 170 } 171 if(bundleJob != null){ 172 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 173 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 174 for (Element coordElem : coordElems) { 175 Attribute name = coordElem.getAttribute("name"); 176 if (name.getValue().equals(baction.getCoordName())) { 177 Configuration coordConf = mergeConfig(coordElem,bundleJob); 178 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); 179 queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue())); 180 } 181 } 182 } 183 } 184 catch (JDOMException jex) { 185 throw new CommandException(ErrorCode.E1301, jex); 186 } 187 catch (JPAExecutorException je) { 188 throw new CommandException(je); 189 } 190 } 191 else if(baction.getStatus() == Job.Status.KILLED){ 192 queueCallable(new CoordKillXCommand(baction.getCoordId())); 193 } 194 else if(baction.getStatus() == Job.Status.SUSPENDED){ 195 queueCallable(new CoordSuspendXCommand(baction.getCoordId())); 196 } 197 else if(baction.getStatus() == Job.Status.RUNNING){ 198 queueCallable(new CoordResumeXCommand(baction.getCoordId())); 199 } 200 } 201 } 202 catch (Exception ex) { 203 log.error("Exception, {0}", ex.getMessage(), ex); 204 } 205 } 206 207 /** 208 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long 209 */ 210 private void runCoordActionRecovery() { 211 XLog.Info.get().clear(); 212 XLog log = XLog.getLog(getClass()); 213 214 try { 215 List<CoordinatorActionBean> cactions = jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(coordOlderThan)); 216 msg.append(", COORD_ACTIONS : " + cactions.size()); 217 for (CoordinatorActionBean caction : cactions) { 218 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 219 INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1); 220 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) { 221 queueCallable(new CoordActionInputCheckXCommand( 222 caction.getId(), caction.getJobId())); 223 224 log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :" 225 + caction.getId()); 226 } 227 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) { 228 CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId())); 229 queueCallable(new CoordActionStartXCommand( 230 caction.getId(), coordJob.getUser(), 231 coordJob.getAuthToken(), caction.getJobId())); 232 233 log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" + caction.getId()); 234 } 235 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) { 236 if (caction.getExternalId() != null) { 237 queueCallable(new SuspendXCommand(caction.getExternalId())); 238 log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" + caction.getId()); 239 } 240 } 241 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) { 242 if (caction.getExternalId() != null) { 243 queueCallable(new KillXCommand(caction.getExternalId())); 244 log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId()); 245 } 246 } 247 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) { 248 if (caction.getExternalId() != null) { 249 queueCallable(new ResumeXCommand(caction.getExternalId())); 250 log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId()); 251 } 252 } 253 } 254 } 255 catch (Exception ex) { 256 log.error("Exception, {0}", ex.getMessage(), ex); 257 } 258 } 259 260 /** 261 * Recover coordinator actions that are staying in READY too long 262 */ 263 private void runCoordActionRecoveryForReady() { 264 XLog.Info.get().clear(); 265 XLog log = XLog.getLog(getClass()); 266 267 try { 268 List<String> jobids = jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(coordOlderThan)); 269 msg.append(", COORD_READY_JOBS : " + jobids.size()); 270 for (String jobid : jobids) { 271 queueCallable(new CoordActionReadyXCommand(jobid)); 272 273 log.info("Recover READY coord actions for jobid :" + jobid); 274 } 275 } 276 catch (Exception ex) { 277 log.error("Exception, {0}", ex.getMessage(), ex); 278 } 279 } 280 281 /** 282 * Recover wf actions 283 */ 284 private void runWFRecovery() { 285 XLog.Info.get().clear(); 286 XLog log = XLog.getLog(getClass()); 287 // queue command for action recovery 288 try { 289 List<WorkflowActionBean> actions = null; 290 try { 291 actions = jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(olderThan)); 292 } 293 catch (JPAExecutorException ex) { 294 log.warn("Exception while reading pending actions from storage", ex); 295 } 296 //log.debug("QUEUING[{0}] pending wf actions for potential recovery", actions.size()); 297 msg.append(" WF_ACTIONS " + actions.size()); 298 299 for (WorkflowActionBean action : actions) { 300 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP, 301 INSTR_RECOVERED_ACTIONS_COUNTER, 1); 302 if (action.getStatus() == WorkflowActionBean.Status.PREP 303 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 304 queueCallable(new ActionStartXCommand(action.getId(), 305 action.getType())); 306 } 307 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 308 Date nextRunTime = action.getPendingAge(); 309 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() 310 - System.currentTimeMillis()); 311 } 312 else if (action.getStatus() == WorkflowActionBean.Status.DONE 313 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 314 queueCallable(new ActionEndXCommand(action.getId(), action.getType())); 315 } 316 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 317 Date nextRunTime = action.getPendingAge(); 318 queueCallable(new ActionEndXCommand(action.getId(), 319 action.getType()), nextRunTime.getTime() 320 - System.currentTimeMillis()); 321 322 } 323 else if (action.getStatus() == WorkflowActionBean.Status.OK 324 || action.getStatus() == WorkflowActionBean.Status.ERROR) { 325 queueCallable(new SignalXCommand(action.getJobId(), 326 action.getId())); 327 } 328 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 329 queueCallable(new ActionStartXCommand(action.getId(), 330 action.getType())); 331 } 332 } 333 } 334 catch (Exception ex) { 335 log.error("Exception, {0}", ex.getMessage(), ex); 336 } 337 } 338 339 /** 340 * Adds callables to a list. If the number of callables in the list reaches {@link 341 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset. 342 * 343 * @param callable the callable to queue. 344 */ 345 private void queueCallable(XCallable<?> callable) { 346 if (callables == null) { 347 callables = new ArrayList<XCallable<?>>(); 348 } 349 callables.add(callable); 350 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 351 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 352 if (ret == false) { 353 XLog.getLog(getClass()).warn( 354 "Unable to queue the callables commands for RecoveryService. " 355 + "Most possibly command queue is full. Queue size is :" 356 + Services.get().get(CallableQueueService.class).queueSize()); 357 } 358 callables = new ArrayList<XCallable<?>>(); 359 } 360 } 361 362 /** 363 * Adds callables to a list. If the number of callables in the list reaches {@link 364 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay 365 * of the callables in the list. The callables list and the delay is reset. 366 * 367 * @param callable the callable to queue. 368 * @param delay the delay for the callable. 369 */ 370 private void queueCallable(XCallable<?> callable, long delay) { 371 if (delayedCallables == null) { 372 delayedCallables = new ArrayList<XCallable<?>>(); 373 } 374 this.delay = Math.max(this.delay, delay); 375 delayedCallables.add(callable); 376 if (delayedCallables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 377 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 378 if (ret == false) { 379 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. " 380 + "Most possibly Callable queue is full. Queue size is :" 381 + Services.get().get(CallableQueueService.class).queueSize()); 382 } 383 delayedCallables = new ArrayList<XCallable<?>>(); 384 this.delay = 0; 385 } 386 } 387 } 388 389 /** 390 * Initializes the RecoveryService. 391 * 392 * @param services services instance. 393 */ 394 @Override 395 public void init(Services services) { 396 Configuration conf = services.getConf(); 397 Runnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt( 398 CONF_COORD_OLDER_THAN, 600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600)); 399 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, conf.getInt(CONF_SERVICE_INTERVAL, 600), 400 SchedulerService.Unit.SEC); 401 } 402 403 /** 404 * Destroy the Recovery Service. 405 */ 406 @Override 407 public void destroy() { 408 } 409 410 /** 411 * Return the public interface for the Recovery Service. 412 * 413 * @return {@link RecoveryService}. 414 */ 415 @Override 416 public Class<? extends Service> getInterface() { 417 return RecoveryService.class; 418 } 419 420 /** 421 * Merge Bundle job config and the configuration from the coord job to pass 422 * to Coord Engine 423 * 424 * @param coordElem the coordinator configuration 425 * @return Configuration merged configuration 426 * @throws CommandException thrown if failed to merge configuration 427 */ 428 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException { 429 XLog.Info.get().clear(); 430 XLog log = XLog.getLog("RecoveryService"); 431 432 String jobConf = bundleJob.getConf(); 433 // Step 1: runConf = jobConf 434 Configuration runConf = null; 435 try { 436 runConf = new XConfiguration(new StringReader(jobConf)); 437 } 438 catch (IOException e1) { 439 log.warn("Configuration parse error in:" + jobConf); 440 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 441 } 442 // Step 2: Merge local properties into runConf 443 // extract 'property' tags under 'configuration' block in the coordElem 444 // convert Element to XConfiguration 445 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 446 447 if (localConfigElement != null) { 448 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 449 Configuration localConf; 450 try { 451 localConf = new XConfiguration(new StringReader(strConfig)); 452 } 453 catch (IOException e1) { 454 log.warn("Configuration parse error in:" + strConfig); 455 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 456 } 457 458 // copy configuration properties in the coordElem to the runConf 459 XConfiguration.copy(localConf, runConf); 460 } 461 462 // Step 3: Extract value of 'app-path' in coordElem, save it as a 463 // new property called 'oozie.coord.application.path', and normalize. 464 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 465 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 466 // Normalize coordinator appPath here; 467 try { 468 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 469 } 470 catch (IOException e) { 471 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 472 } 473 return runConf; 474 } 475 }