001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Set; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.oozie.CoordinatorActionBean; 031 import org.apache.oozie.CoordinatorActionInfo; 032 import org.apache.oozie.CoordinatorJobBean; 033 import org.apache.oozie.ErrorCode; 034 import org.apache.oozie.XException; 035 import org.apache.oozie.action.ActionExecutorException; 036 import org.apache.oozie.action.hadoop.FsActionExecutor; 037 import org.apache.oozie.client.CoordinatorAction; 038 import org.apache.oozie.client.CoordinatorJob; 039 import org.apache.oozie.client.Job; 040 import org.apache.oozie.client.SLAEvent.SlaAppType; 041 import org.apache.oozie.client.rest.RestConstants; 042 import org.apache.oozie.command.CommandException; 043 import org.apache.oozie.command.PreconditionException; 044 import org.apache.oozie.command.RerunTransitionXCommand; 045 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 046 import org.apache.oozie.coord.CoordELFunctions; 047 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 048 import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor; 049 import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor; 050 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 051 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 052 import org.apache.oozie.executor.jpa.JPAExecutorException; 053 import org.apache.oozie.service.JPAService; 054 import org.apache.oozie.service.Services; 055 import org.apache.oozie.util.DateUtils; 056 import org.apache.oozie.util.InstrumentUtils; 057 import org.apache.oozie.util.LogUtils; 058 import org.apache.oozie.util.ParamChecker; 059 import org.apache.oozie.util.StatusUtils; 060 import org.apache.oozie.util.XConfiguration; 061 import org.apache.oozie.util.XLog; 062 import org.apache.oozie.util.XmlUtils; 063 import org.apache.oozie.util.db.SLADbOperations; 064 import org.jdom.Element; 065 import org.jdom.JDOMException; 066 067 /** 068 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup. 069 * <p/> 070 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or 071 * {@link RestConstants.JOB_COORD_RERUN_ACTION}. 072 * <p/> 073 * The "refresh" is used to indicate if user wants to refresh an action's input and output events. 074 * <p/> 075 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 076 */ 077 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> { 078 079 private String rerunType; 080 private String scope; 081 private boolean refresh; 082 private boolean noCleanup; 083 private CoordinatorJobBean coordJob = null; 084 private JPAService jpaService = null; 085 protected boolean prevPending; 086 087 /** 088 * The constructor for class {@link CoordRerunXCommand} 089 * 090 * @param jobId the job id 091 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION} 092 * @param scope the rerun scope for given rerunType separated by "," 093 * @param refresh true if user wants to refresh input/output dataset urls 094 * @param noCleanup false if user wants to cleanup output events for given rerun actions 095 */ 096 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 097 super("coord_rerun", "coord_rerun", 1); 098 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 099 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 100 this.scope = ParamChecker.notEmpty(scope, "scope"); 101 this.refresh = refresh; 102 this.noCleanup = noCleanup; 103 } 104 105 /** 106 * Get the list of actions for given id ranges 107 * 108 * @param jobId coordinator job id 109 * @param scope the id range to rerun separated by "," 110 * @return the list of all actions to rerun 111 * @throws CommandException thrown if failed to get coordinator actions by given id range 112 */ 113 private List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException { 114 ParamChecker.notEmpty(jobId, "jobId"); 115 ParamChecker.notEmpty(scope, "scope"); 116 117 Set<String> actions = new HashSet<String>(); 118 String[] list = scope.split(","); 119 for (String s : list) { 120 s = s.trim(); 121 if (s.contains("-")) { 122 String[] range = s.split("-"); 123 if (range.length != 2) { 124 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 125 } 126 int start; 127 int end; 128 try { 129 start = Integer.parseInt(range[0].trim()); 130 end = Integer.parseInt(range[1].trim()); 131 if (start > end) { 132 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 133 } 134 } 135 catch (NumberFormatException ne) { 136 throw new CommandException(ErrorCode.E0302, ne); 137 } 138 for (int i = start; i <= end; i++) { 139 actions.add(jobId + "@" + i); 140 } 141 } 142 else { 143 try { 144 Integer.parseInt(s); 145 } 146 catch (NumberFormatException ne) { 147 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 148 + "'. Integer only."); 149 } 150 actions.add(jobId + "@" + s); 151 } 152 } 153 154 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 155 for (String id : actions) { 156 CoordinatorActionBean coordAction; 157 try { 158 coordAction = jpaService.execute(new CoordActionGetJPAExecutor(id)); 159 } 160 catch (JPAExecutorException je) { 161 throw new CommandException(je); 162 } 163 coordActions.add(coordAction); 164 LOG.debug("Rerun coordinator for actionId='" + id + "'"); 165 } 166 return coordActions; 167 } 168 169 /** 170 * Get the list of actions for given date ranges 171 * 172 * @param jobId coordinator job id 173 * @param scope the date range to rerun separated by "," 174 * @return the list of dates to rerun 175 * @throws CommandException thrown if failed to get coordinator actions by given date range 176 */ 177 private List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope) throws CommandException { 178 ParamChecker.notEmpty(jobId, "jobId"); 179 ParamChecker.notEmpty(scope, "scope"); 180 181 Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>(); 182 String[] list = scope.split(","); 183 for (String s : list) { 184 s = s.trim(); 185 if (s.contains("::")) { 186 String[] dateRange = s.split("::"); 187 if (dateRange.length != 2) { 188 throw new CommandException(ErrorCode.E0302, "format is wrong for date's range '" + s + "'"); 189 } 190 Date start; 191 Date end; 192 try { 193 start = DateUtils.parseDateUTC(dateRange[0].trim()); 194 end = DateUtils.parseDateUTC(dateRange[1].trim()); 195 if (start.after(end)) { 196 throw new CommandException(ErrorCode.E0302, "start date is older than end date: '" + s + "'"); 197 } 198 } 199 catch (Exception e) { 200 throw new CommandException(ErrorCode.E0302, e); 201 } 202 203 List<CoordinatorActionBean> listOfActions = getActionIdsFromDateRange(jobId, start, end); 204 actionSet.addAll(listOfActions); 205 } 206 else { 207 try { 208 Date date = DateUtils.parseDateUTC(s.trim()); 209 CoordinatorActionBean coordAction = jpaService 210 .execute(new CoordJobGetActionForNominalTimeJPAExecutor(jobId, date)); 211 actionSet.add(coordAction); 212 } 213 catch (JPAExecutorException e) { 214 throw new CommandException(e); 215 } 216 catch (Exception e) { 217 throw new CommandException(ErrorCode.E0302, e); 218 } 219 } 220 } 221 222 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 223 for (CoordinatorActionBean coordAction : actionSet) { 224 coordActions.add(coordAction); 225 LOG.debug("Rerun coordinator for actionId='" + coordAction.getId() + "'"); 226 } 227 return coordActions; 228 } 229 230 /** 231 * Get coordinator action ids between given start and end time 232 * 233 * @param jobId coordinator job id 234 * @param start start time 235 * @param end end time 236 * @return a list of coordinator actions belong to the range of start and end time 237 * @throws CommandException thrown if failed to get coordinator actions 238 */ 239 private List<CoordinatorActionBean> getActionIdsFromDateRange(String jobId, Date start, Date end) 240 throws CommandException { 241 List<CoordinatorActionBean> list; 242 try { 243 list = jpaService.execute(new CoordJobGetActionsForDatesJPAExecutor(jobId, start, end)); 244 } 245 catch (JPAExecutorException je) { 246 throw new CommandException(je); 247 } 248 return list; 249 } 250 251 /** 252 * Check if all given actions are eligible to rerun. 253 * 254 * @param actions list of CoordinatorActionBean 255 * @return true if all actions are eligible to rerun 256 */ 257 private boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 258 boolean ret = false; 259 for (CoordinatorActionBean coordAction : coordActions) { 260 ret = true; 261 if (!coordAction.isTerminalStatus()) { 262 ret = false; 263 break; 264 } 265 } 266 return ret; 267 } 268 269 /** 270 * Cleanup output-events directories 271 * 272 * @param eAction coordinator action xml 273 * @param user user name 274 * @param group group name 275 */ 276 @SuppressWarnings("unchecked") 277 private void cleanupOutputEvents(Element eAction, String user, String group) { 278 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 279 if (outputList != null) { 280 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 281 if (data.getChild("uris", data.getNamespace()) != null) { 282 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 283 if (uris != null) { 284 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 285 FsActionExecutor fsAe = new FsActionExecutor(); 286 for (String uri : uriArr) { 287 Path path = new Path(uri); 288 try { 289 fsAe.delete(user, group, path); 290 LOG.debug("Cleanup the output dir " + path); 291 } 292 catch (ActionExecutorException ae) { 293 LOG.warn("Failed to cleanup the output dir " + uri, ae); 294 } 295 } 296 } 297 298 } 299 } 300 } 301 else { 302 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 303 } 304 } 305 306 /** 307 * Refresh an action's input and ouput events. 308 * 309 * @param coordJob coordinator job bean 310 * @param coordAction coordinator action bean 311 * @throws Exception thrown if failed to materialize coordinator action 312 */ 313 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception { 314 Configuration jobConf = null; 315 try { 316 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 317 } 318 catch (IOException ioe) { 319 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 320 throw new CommandException(ErrorCode.E1005, ioe); 321 } 322 String jobXml = coordJob.getJobXml(); 323 Element eJob = XmlUtils.parseXml(jobXml); 324 Date actualTime = new Date(); 325 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 326 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 327 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 328 + XmlUtils.prettyPrint(actionXml).toString()); 329 coordAction.setActionXml(actionXml); 330 } 331 332 /** 333 * Update an action into database table 334 * 335 * @param coordJob coordinator job bean 336 * @param coordAction coordinator action bean 337 * @param actionXml coordinator action xml 338 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event 339 */ 340 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml) 341 throws Exception { 342 LOG.debug("updateAction for actionId=" + coordAction.getId()); 343 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) { 344 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId()); 345 coordAction.setCreatedTime(new Date()); 346 } 347 coordAction.setStatus(CoordinatorAction.Status.WAITING); 348 coordAction.setExternalId(""); 349 coordAction.setExternalStatus(""); 350 coordAction.setRerunTime(new Date()); 351 coordAction.setLastModifiedTime(new Date()); 352 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction)); 353 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup()); 354 } 355 356 /** 357 * Create SLA RegistrationEvent 358 * 359 * @param actionXml action xml 360 * @param actionBean coordinator action bean 361 * @param user user name 362 * @param group group name 363 * @throws Exception thrown if unable to write sla registration event 364 */ 365 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group) 366 throws Exception { 367 Element eAction = XmlUtils.parseXml(actionXml); 368 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 369 SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, group, 370 LOG); 371 } 372 373 /* (non-Javadoc) 374 * @see org.apache.oozie.command.XCommand#getEntityKey() 375 */ 376 @Override 377 protected String getEntityKey() { 378 return jobId; 379 } 380 381 /* (non-Javadoc) 382 * @see org.apache.oozie.command.XCommand#isLockRequired() 383 */ 384 @Override 385 protected boolean isLockRequired() { 386 return true; 387 } 388 389 /* (non-Javadoc) 390 * @see org.apache.oozie.command.XCommand#loadState() 391 */ 392 @Override 393 protected void loadState() throws CommandException { 394 jpaService = Services.get().get(JPAService.class); 395 if (jpaService == null) { 396 throw new CommandException(ErrorCode.E0610); 397 } 398 try { 399 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 400 prevPending = coordJob.isPending(); 401 } 402 catch (JPAExecutorException je) { 403 throw new CommandException(je); 404 } 405 LogUtils.setLogInfo(coordJob, logInfo); 406 } 407 408 /* (non-Javadoc) 409 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 410 */ 411 @Override 412 protected void verifyPrecondition() throws CommandException, PreconditionException { 413 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED 414 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 415 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 416 throw new CommandException(ErrorCode.E1018, 417 "coordinator job is killed or failed so all actions are not eligible to rerun!"); 418 } 419 420 // no actioins have been created for PREP job 421 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 422 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 423 throw new CommandException(ErrorCode.E1018, 424 "coordinator job is PREP so no actions are materialized to rerun!"); 425 } 426 } 427 428 @Override 429 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 430 verifyPrecondition(); 431 } 432 433 @Override 434 public void rerunChildren() throws CommandException { 435 boolean isError = false; 436 try { 437 CoordinatorActionInfo coordInfo = null; 438 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 439 List<CoordinatorActionBean> coordActions; 440 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) { 441 coordActions = getCoordActionsFromDates(jobId, scope); 442 } 443 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) { 444 coordActions = getCoordActionsFromIds(jobId, scope); 445 } 446 else { 447 isError = true; 448 throw new CommandException(ErrorCode.E1018, "date or action expected."); 449 } 450 if (checkAllActionsRunnable(coordActions)) { 451 for (CoordinatorActionBean coordAction : coordActions) { 452 String actionXml = coordAction.getActionXml(); 453 if (!noCleanup) { 454 Element eAction = XmlUtils.parseXml(actionXml); 455 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup()); 456 } 457 if (refresh) { 458 refreshAction(coordJob, coordAction); 459 } 460 updateAction(coordJob, coordAction, actionXml); 461 462 queue(new CoordActionNotificationXCommand(coordAction), 100); 463 queue(new CoordActionInputCheckXCommand(coordAction.getId()), 100); 464 } 465 } 466 else { 467 isError = true; 468 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 469 } 470 coordInfo = new CoordinatorActionInfo(coordActions); 471 472 ret = coordInfo; 473 } 474 catch (XException xex) { 475 isError = true; 476 throw new CommandException(xex); 477 } 478 catch (JDOMException jex) { 479 isError = true; 480 throw new CommandException(ErrorCode.E0700, jex); 481 } 482 catch (Exception ex) { 483 isError = true; 484 throw new CommandException(ErrorCode.E1018, ex); 485 } 486 finally{ 487 if(isError){ 488 transitToPrevious(); 489 } 490 } 491 } 492 493 /* 494 * (non-Javadoc) 495 * @see org.apache.oozie.command.TransitionXCommand#getJob() 496 */ 497 @Override 498 public Job getJob() { 499 return coordJob; 500 } 501 502 @Override 503 public void notifyParent() throws CommandException { 504 //update bundle action 505 if (getPrevStatus() != null && coordJob.getBundleId() != null) { 506 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus()); 507 bundleStatusUpdate.call(); 508 } 509 } 510 511 @Override 512 public void updateJob() throws CommandException { 513 try { 514 // rerun a paused coordinator job will keep job status at paused and pending at previous pending 515 if (getPrevStatus()!= null && getPrevStatus().equals(Job.Status.PAUSED)) { 516 coordJob.setStatus(Job.Status.PAUSED); 517 if (prevPending) { 518 coordJob.setPending(); 519 } else { 520 coordJob.resetPending(); 521 } 522 } 523 524 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 525 } 526 catch (JPAExecutorException je) { 527 throw new CommandException(je); 528 } 529 } 530 531 /* (non-Javadoc) 532 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 533 */ 534 @Override 535 public XLog getLog() { 536 return LOG; 537 } 538 539 @Override 540 public final void transitToNext() { 541 prevStatus = coordJob.getStatus(); 542 coordJob.setStatus(Job.Status.RUNNING); 543 // used for backward support of coordinator 0.1 schema 544 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus)); 545 coordJob.setPending(); 546 } 547 548 private final void transitToPrevious() throws CommandException { 549 coordJob.setStatus(getPrevStatus()); 550 if (!prevPending) { 551 coordJob.resetPending(); 552 } 553 else { 554 coordJob.setPending(); 555 } 556 } 557 }