001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.sql.Timestamp; 024import java.util.ArrayList; 025import java.util.Date; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Set; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.oozie.BundleActionBean; 032import org.apache.oozie.BundleJobBean; 033import org.apache.oozie.CoordinatorActionBean; 034import org.apache.oozie.CoordinatorJobBean; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.WorkflowActionBean; 037import org.apache.oozie.client.Job; 038import org.apache.oozie.client.OozieClient; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.command.bundle.BundleCoordSubmitXCommand; 041import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 042import org.apache.oozie.command.coord.CoordActionInputCheckXCommand; 043import org.apache.oozie.command.coord.CoordActionReadyXCommand; 044import org.apache.oozie.command.coord.CoordActionStartXCommand; 045import org.apache.oozie.command.coord.CoordKillXCommand; 046import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand; 047import org.apache.oozie.command.coord.CoordResumeXCommand; 048import org.apache.oozie.command.coord.CoordSuspendXCommand; 049import org.apache.oozie.command.wf.ActionEndXCommand; 050import org.apache.oozie.command.wf.ActionStartXCommand; 051import org.apache.oozie.command.wf.KillXCommand; 052import org.apache.oozie.command.wf.ResumeXCommand; 053import org.apache.oozie.command.wf.SignalXCommand; 054import org.apache.oozie.command.wf.SuspendXCommand; 055import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 056import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 057import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 058import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 059import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 060import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 061import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 062import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 063import org.apache.oozie.executor.jpa.JPAExecutorException; 064import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 065import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 066import org.apache.oozie.util.ELUtils; 067import org.apache.oozie.util.JobUtils; 068import org.apache.oozie.util.XCallable; 069import org.apache.oozie.util.XConfiguration; 070import org.apache.oozie.util.XLog; 071import org.apache.oozie.util.XmlUtils; 072import org.jdom.Attribute; 073import org.jdom.Element; 074 075/** 076 * The Recovery Service checks for pending actions and premater coordinator jobs older than a configured age and then 077 * queues them for execution. 078 */ 079public class RecoveryService implements Service { 080 081 public static final String RECOVERY_SERVICE_CONF_PREFIX = Service.CONF_PREFIX + "RecoveryService."; 082 public static final String CONF_PREFIX_WF_ACTIONS = RECOVERY_SERVICE_CONF_PREFIX + "wf.actions."; 083 public static final String CONF_PREFIX_COORD = RECOVERY_SERVICE_CONF_PREFIX + "coord."; 084 public static final String CONF_PREFIX_BUNDLE = RECOVERY_SERVICE_CONF_PREFIX + "bundle."; 085 /** 086 * Time interval, in seconds, at which the recovery service will be scheduled to run. 087 */ 088 public static final String CONF_SERVICE_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "interval"; 089 /** 090 * The number of callables to be queued in a batch. 091 */ 092 public static final String CONF_CALLABLE_BATCH_SIZE = RECOVERY_SERVICE_CONF_PREFIX + "callable.batch.size"; 093 094 /** 095 * Delay for the push missing dependencies in milliseconds. 096 */ 097 public static final String CONF_PUSH_DEPENDENCY_INTERVAL = RECOVERY_SERVICE_CONF_PREFIX + "push.dependency.interval"; 098 099 /** 100 * Age of actions to queue, in seconds. 101 */ 102 public static final String CONF_WF_ACTIONS_OLDER_THAN = CONF_PREFIX_WF_ACTIONS + "older.than"; 103 104 public static final String CONF_WF_ACTIONS_CREATED_TIME_INTERVAL = CONF_PREFIX_WF_ACTIONS + "created.time.interval"; 105 106 /** 107 * Age of coordinator jobs to recover, in seconds. 108 */ 109 public static final String CONF_COORD_OLDER_THAN = CONF_PREFIX_COORD + "older.than"; 110 111 /** 112 * Age of Bundle jobs to recover, in seconds. 113 */ 114 public static final String CONF_BUNDLE_OLDER_THAN = CONF_PREFIX_BUNDLE + "older.than"; 115 116 private static final String INSTRUMENTATION_GROUP = "recovery"; 117 private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions"; 118 private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions"; 119 private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions"; 120 121 public static final long ONE_DAY_MILLISCONDS = 25 * 60 * 60 * 1000; 122 123 124 125 /** 126 * RecoveryRunnable is the Runnable which is scheduled to run with the configured interval, and takes care of the 127 * queuing of commands. 128 */ 129 static class RecoveryRunnable implements Runnable { 130 private final long olderThan; 131 private final long coordOlderThan; 132 private final long bundleOlderThan; 133 private long delay = 0; 134 private List<XCallable<?>> callables; 135 private List<XCallable<?>> delayedCallables; 136 private StringBuilder msg = null; 137 private JPAService jpaService = null; 138 139 public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) { 140 this.olderThan = olderThan; 141 this.coordOlderThan = coordOlderThan; 142 this.bundleOlderThan = bundleOlderThan; 143 } 144 145 public void run() { 146 XLog.Info.get().clear(); 147 XLog log = XLog.getLog(getClass()); 148 msg = new StringBuilder(); 149 jpaService = Services.get().get(JPAService.class); 150 runWFRecovery(); 151 runCoordActionRecovery(); 152 runBundleRecovery(); 153 log.debug("QUEUED [{0}] for potential recovery", msg.toString()); 154 boolean ret = false; 155 if (null != callables) { 156 ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 157 if (ret == false) { 158 log.warn("Unable to queue the callables commands for RecoveryService. " 159 + "Most possibly command queue is full. Queue size is :" 160 + Services.get().get(CallableQueueService.class).queueSize()); 161 } 162 callables = null; 163 } 164 if (null != delayedCallables) { 165 ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 166 if (ret == false) { 167 log.warn("Unable to queue the delayedCallables commands for RecoveryService. " 168 + "Most possibly Callable queue is full. Queue size is :" 169 + Services.get().get(CallableQueueService.class).queueSize()); 170 } 171 delayedCallables = null; 172 this.delay = 0; 173 } 174 } 175 176 private void runBundleRecovery(){ 177 XLog.Info.get().clear(); 178 XLog log = XLog.getLog(getClass()); 179 List<BundleActionBean> bactions = null; 180 try { 181 bactions = BundleActionQueryExecutor.getInstance().getList( 182 BundleActionQuery.GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN, bundleOlderThan); 183 } 184 catch (JPAExecutorException ex) { 185 log.warn("Error reading bundle actions from database", ex); 186 return; 187 } 188 msg.append(", BUNDLE_ACTIONS : ").append(bactions.size()); 189 for (BundleActionBean baction : bactions) { 190 try { 191 Services.get().get(InstrumentationService.class).get() 192 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1); 193 if (baction.getCoordId() == null && baction.getStatus() != Job.Status.PREP) { 194 log.error("CoordId is null for Bundle action " + baction.getBundleActionId()); 195 continue; 196 } 197 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(baction.getBundleId())) { 198 if (baction.getStatus() == Job.Status.PREP && baction.getCoordId() == null) { 199 200 CoordinatorJobBean coordJobs = CoordJobQueryExecutor.getInstance().getIfExist( 201 CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, baction.getCoordName(), 202 baction.getBundleId()); 203 204 if (coordJobs == null) { 205 log.debug("Coord [{0}] for bundle [{1}] is not yet submitted , submitting new one", 206 baction.getCoordName(), baction.getBundleId()); 207 208 BundleJobBean bundleJob = null; 209 if (jpaService != null) { 210 bundleJob = BundleJobQueryExecutor.getInstance().get( 211 BundleJobQuery.GET_BUNDLE_JOB_ID_JOBXML_CONF, baction.getBundleId()); 212 } 213 Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml()); 214 @SuppressWarnings("unchecked") 215 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 216 for (Element coordElem : coordElems) { 217 Attribute name = coordElem.getAttribute("name"); 218 String coordName = name.getValue(); 219 Configuration coordConf = mergeConfig(coordElem, bundleJob); 220 try { 221 coordName = ELUtils.resolveAppName(coordName, coordConf); 222 } 223 catch (Exception e) { 224 log.error("Error evaluating coord name " + e.getMessage(), e); 225 continue; 226 } 227 if (coordName.equals(baction.getCoordName())) { 228 coordConf.set(OozieClient.BUNDLE_ID, baction.getBundleId()); 229 queueCallable(new BundleCoordSubmitXCommand(coordConf, bundleJob.getId(), 230 coordName)); 231 } 232 } 233 } 234 else { 235 log.debug( 236 "Coord [{0}] for bundle [{1}] is submitted , but bundle action is not updated.", 237 baction.getCoordName(), baction.getBundleId()); 238 coordJobs = CoordJobQueryExecutor.getInstance().getIfExist( 239 CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, baction.getCoordName(), 240 coordJobs.getId()); 241 queueCallable(new BundleStatusUpdateXCommand(coordJobs, baction.getStatus())); 242 } 243 } 244 else if (baction.getStatus() == Job.Status.KILLED) { 245 queueCallable(new CoordKillXCommand(baction.getCoordId())); 246 } 247 else if (baction.getStatus() == Job.Status.SUSPENDED 248 || baction.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 249 queueCallable(new CoordSuspendXCommand(baction.getCoordId())); 250 } 251 else if (baction.getStatus() == Job.Status.RUNNING 252 || baction.getStatus() == Job.Status.RUNNINGWITHERROR) { 253 queueCallable(new CoordResumeXCommand(baction.getCoordId())); 254 } 255 } 256 } 257 catch (Exception ex) { 258 log.error("Exception, {0}", ex.getMessage(), ex); 259 } 260 } 261 262 } 263 264 /** 265 * Recover coordinator actions that are staying in WAITING or SUBMITTED too long 266 */ 267 private void runCoordActionRecovery() { 268 Set<String> readyJobs = new HashSet<String>(); 269 XLog.Info.get().clear(); 270 XLog log = XLog.getLog(getClass()); 271 long pushMissingDepInterval = ConfigurationService.getLong(CONF_PUSH_DEPENDENCY_INTERVAL); 272 long pushMissingDepDelay = pushMissingDepInterval; 273 Timestamp ts = new Timestamp(System.currentTimeMillis() - this.coordOlderThan * 1000); 274 275 List<CoordinatorActionBean> cactions = new ArrayList<CoordinatorActionBean>(); 276 try { 277 cactions.addAll(CoordActionQueryExecutor.getInstance().getList( 278 CoordActionQuery.GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, ts)); 279 cactions.addAll(CoordActionQueryExecutor.getInstance().getList( 280 CoordActionQuery.GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, ts)); 281 282 } 283 catch (JPAExecutorException ex) { 284 log.warn("Error reading coord actions from database", ex); 285 return; 286 } 287 msg.append(", COORD_ACTIONS : " + cactions.size()); 288 for (CoordinatorActionBean caction : cactions) { 289 try { 290 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(caction.getId())) { 291 Services.get().get(InstrumentationService.class).get() 292 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1); 293 if (caction.getStatus() == CoordinatorActionBean.Status.WAITING) { 294 queueCallable(new CoordActionInputCheckXCommand(caction.getId(), caction.getJobId())); 295 log.debug("Recover a coord action from [WAITING] and resubmit CoordActionInputCheckXCommand :[{0}]" 296 , caction.getId()); 297 if (caction.getPushMissingDependencies() != null 298 && caction.getPushMissingDependencies().length() != 0) { 299 queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true), 300 pushMissingDepDelay); 301 pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval; 302 log.debug("Recover a coord action from [WAITING] and resubmit CoordPushDependencyCheckX :[{0}]" 303 , caction.getId()); 304 } 305 } 306 else if (caction.getStatus() == CoordinatorActionBean.Status.SUBMITTED) { 307 CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( 308 CoordJobQuery.GET_COORD_JOB_USER_APPNAME, caction.getJobId()); 309 queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), 310 coordJob.getAppName(), caction.getJobId())); 311 log.debug("Recover a coord action from [SUBMITTED] and resubmit CoordActionStartCommand :[{0}]", 312 caction.getId()); 313 } 314 else if (caction.getStatus() == CoordinatorActionBean.Status.SUSPENDED) { 315 if (caction.getExternalId() != null && caction.getPending() > 1) { 316 queueCallable(new SuspendXCommand(caction.getExternalId())); 317 log.debug("Recover a coord action from [SUSPENDED] and resubmit SuspendXCommand :[{0}]" 318 , caction.getId()); 319 } 320 } 321 else if (caction.getStatus() == CoordinatorActionBean.Status.KILLED) { 322 if (caction.getExternalId() != null) { 323 queueCallable(new KillXCommand(caction.getExternalId())); 324 log.debug("Recover a coord action from [KILLED] and resubmit KillXCommand :[{0}]" 325 , caction.getId()); 326 } 327 } 328 else if (caction.getStatus() == CoordinatorActionBean.Status.RUNNING) { 329 if (caction.getExternalId() != null) { 330 queueCallable(new ResumeXCommand(caction.getExternalId())); 331 log.debug("Recover a coord action from [RUNNING] and resubmit ResumeXCommand :[{0}]" 332 , caction.getId()); 333 } 334 } 335 else if (caction.getStatus() == CoordinatorActionBean.Status.READY) { 336 readyJobs.add(caction.getJobId()); 337 } 338 } 339 } 340 catch (Exception ex) { 341 log.error("Exception, {0}", ex.getMessage(), ex); 342 } 343 } 344 runCoordActionRecoveryForReady(readyJobs); 345 } 346 347 /** 348 * Recover coordinator actions that are staying in READY too long 349 */ 350 private void runCoordActionRecoveryForReady(Set<String> jobIds) { 351 XLog.Info.get().clear(); 352 XLog log = XLog.getLog(getClass()); 353 List<String> coordJobIds = new ArrayList<String>(jobIds); 354 try { 355 coordJobIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(coordJobIds); 356 msg.append(", COORD_READY_JOBS : " + coordJobIds.size()); 357 for (String jobid : coordJobIds) { 358 queueCallable(new CoordActionReadyXCommand(jobid)); 359 log.debug("Recover a coord action from [READY] resubmit CoordActionReadyXCommand :[{0}]", jobid); 360 } 361 } 362 catch (Exception ex) { 363 log.error("Exception, {0}", ex.getMessage(), ex); 364 } 365 } 366 367 /** 368 * Recover wf actions 369 */ 370 private void runWFRecovery() { 371 XLog.Info.get().clear(); 372 XLog log = XLog.getLog(getClass()); 373 // queue command for action recovery 374 375 long createdTimeInterval = new Date().getTime() - ConfigurationService.getLong(CONF_WF_ACTIONS_CREATED_TIME_INTERVAL) 376 * ONE_DAY_MILLISCONDS; 377 378 List<WorkflowActionBean> actions = null; 379 try { 380 actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_PENDING_ACTIONS, 381 olderThan, createdTimeInterval); 382 } 383 catch (JPAExecutorException ex) { 384 log.warn("Exception while reading pending actions from storage", ex); 385 return; 386 } 387 // log.debug("QUEUING[{0}] pending wf actions for potential recovery", 388 // actions.size()); 389 msg.append(" WF_ACTIONS " + actions.size()); 390 391 for (WorkflowActionBean action : actions) { 392 try { 393 if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(action.getId())) { 394 Services.get().get(InstrumentationService.class).get() 395 .incr(INSTRUMENTATION_GROUP, INSTR_RECOVERED_ACTIONS_COUNTER, 1); 396 if (action.getStatus() == WorkflowActionBean.Status.PREP 397 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) { 398 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 399 log.debug("Recover a workflow action from [{0}] status and resubmit ActionStartXCommand :[{1}]", 400 action.getStatus(), action.getId()); 401 } 402 else if (action.getStatus() == WorkflowActionBean.Status.START_RETRY) { 403 Date nextRunTime = action.getPendingAge(); 404 queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() 405 - System.currentTimeMillis()); 406 log.debug("Recover a workflow action from [START_RETRY] status and resubmit ActionStartXCommand :[{0}]" 407 , action.getId()); 408 } 409 else if (action.getStatus() == WorkflowActionBean.Status.DONE 410 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 411 queueCallable(new ActionEndXCommand(action.getId(), action.getType())); 412 log.debug("Recover a workflow action from [{0}] status and resubmit ActionEndXCommand :[{1}]", 413 action.getStatus(), action.getId()); 414 } 415 else if (action.getStatus() == WorkflowActionBean.Status.END_RETRY) { 416 Date nextRunTime = action.getPendingAge(); 417 queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime() 418 - System.currentTimeMillis()); 419 log.debug("Recover a workflow action from [END_RETRY] status and resubmit ActionEndXCommand :[{0}]", 420 action.getId()); 421 } 422 else if (action.getStatus() == WorkflowActionBean.Status.OK 423 || action.getStatus() == WorkflowActionBean.Status.ERROR) { 424 queueCallable(new SignalXCommand(action.getJobId(), action.getId())); 425 log.debug("Recover a workflow action from [{0}] status and resubmit SignalXCommand :[{1}]", 426 action.getStatus(), action.getId()); 427 } 428 else if (action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 429 queueCallable(new ActionStartXCommand(action.getId(), action.getType())); 430 log.debug("Recover a workflow action from [USER_RETRY] status and resubmit ActionStartXCommand :[{0}]" 431 , action.getId()); 432 } 433 } 434 } 435 catch (Exception ex) { 436 log.error("Exception, {0}", ex.getMessage(), ex); 437 } 438 } 439 440 } 441 442 /** 443 * Adds callables to a list. If the number of callables in the list reaches {@link 444 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list is reset. 445 * 446 * @param callable the callable to queue. 447 */ 448 private void queueCallable(XCallable<?> callable) { 449 if (callables == null) { 450 callables = new ArrayList<XCallable<?>>(); 451 } 452 callables.add(callable); 453 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) { 454 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); 455 if (ret == false) { 456 XLog.getLog(getClass()).warn( 457 "Unable to queue the callables commands for RecoveryService. " 458 + "Most possibly command queue is full. Queue size is :" 459 + Services.get().get(CallableQueueService.class).queueSize()); 460 } 461 callables = new ArrayList<XCallable<?>>(); 462 } 463 } 464 465 /** 466 * Adds callables to a list. If the number of callables in the list reaches {@link 467 * RecoveryService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued with the delay set to the maximum delay 468 * of the callables in the list. The callables list and the delay is reset. 469 * 470 * @param callable the callable to queue. 471 * @param delay the delay for the callable. 472 */ 473 private void queueCallable(XCallable<?> callable, long delay) { 474 if (delayedCallables == null) { 475 delayedCallables = new ArrayList<XCallable<?>>(); 476 } 477 this.delay = Math.max(this.delay, delay); 478 delayedCallables.add(callable); 479 if (delayedCallables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)){ 480 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); 481 if (ret == false) { 482 XLog.getLog(getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. " 483 + "Most possibly Callable queue is full. Queue size is :" 484 + Services.get().get(CallableQueueService.class).queueSize()); 485 } 486 delayedCallables = new ArrayList<XCallable<?>>(); 487 this.delay = 0; 488 } 489 } 490 } 491 492 /** 493 * Initializes the RecoveryService. 494 * 495 * @param services services instance. 496 */ 497 @Override 498 public void init(Services services) { 499 Configuration conf = services.getConf(); 500 Runnable recoveryRunnable = new RecoveryRunnable( 501 ConfigurationService.getInt(conf, CONF_WF_ACTIONS_OLDER_THAN), 502 ConfigurationService.getInt(conf, CONF_COORD_OLDER_THAN), 503 ConfigurationService.getInt(conf, CONF_BUNDLE_OLDER_THAN)); 504 services.get(SchedulerService.class).schedule(recoveryRunnable, 10, getRecoveryServiceInterval(conf), 505 SchedulerService.Unit.SEC); 506 } 507 508 public int getRecoveryServiceInterval(Configuration conf){ 509 return ConfigurationService.getInt(conf, CONF_SERVICE_INTERVAL); 510 } 511 512 /** 513 * Destroy the Recovery Service. 514 */ 515 @Override 516 public void destroy() { 517 } 518 519 /** 520 * Return the public interface for the Recovery Service. 521 * 522 * @return {@link RecoveryService}. 523 */ 524 @Override 525 public Class<? extends Service> getInterface() { 526 return RecoveryService.class; 527 } 528 529 /** 530 * Merge Bundle job config and the configuration from the coord job to pass 531 * to Coord Engine 532 * 533 * @param coordElem the coordinator configuration 534 * @return Configuration merged configuration 535 * @throws CommandException thrown if failed to merge configuration 536 */ 537 private static Configuration mergeConfig(Element coordElem,BundleJobBean bundleJob) throws CommandException { 538 XLog.Info.get().clear(); 539 XLog log = XLog.getLog("RecoveryService"); 540 541 String jobConf = bundleJob.getConf(); 542 // Step 1: runConf = jobConf 543 Configuration runConf = null; 544 try { 545 runConf = new XConfiguration(new StringReader(jobConf)); 546 } 547 catch (IOException e1) { 548 log.warn("Configuration parse error in:" + jobConf); 549 throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1); 550 } 551 // Step 2: Merge local properties into runConf 552 // extract 'property' tags under 'configuration' block in the coordElem 553 // convert Element to XConfiguration 554 Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace()); 555 556 if (localConfigElement != null) { 557 String strConfig = XmlUtils.prettyPrint(localConfigElement).toString(); 558 Configuration localConf; 559 try { 560 localConf = new XConfiguration(new StringReader(strConfig)); 561 } 562 catch (IOException e1) { 563 log.warn("Configuration parse error in:" + strConfig); 564 throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1); 565 } 566 567 // copy configuration properties in the coordElem to the runConf 568 XConfiguration.copy(localConf, runConf); 569 } 570 571 // Step 3: Extract value of 'app-path' in coordElem, save it as a 572 // new property called 'oozie.coord.application.path', and normalize. 573 String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue(); 574 runConf.set(OozieClient.COORDINATOR_APP_PATH, appPath); 575 // Normalize coordinator appPath here; 576 try { 577 JobUtils.normalizeAppPath(runConf.get(OozieClient.USER_NAME), runConf.get(OozieClient.GROUP_NAME), runConf); 578 } 579 catch (IOException e) { 580 throw new CommandException(ErrorCode.E1001, runConf.get(OozieClient.COORDINATOR_APP_PATH)); 581 } 582 return runConf; 583 } 584}