001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.Date; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Map.Entry; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.oozie.CoordinatorActionBean; 034import org.apache.oozie.CoordinatorActionInfo; 035import org.apache.oozie.CoordinatorJobBean; 036import org.apache.oozie.ErrorCode; 037import org.apache.oozie.SLAEventBean; 038import org.apache.oozie.XException; 039import org.apache.oozie.client.CoordinatorAction; 040import org.apache.oozie.client.CoordinatorJob; 041import org.apache.oozie.client.Job; 042import org.apache.oozie.client.SLAEvent.SlaAppType; 043import org.apache.oozie.client.rest.RestConstants; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.command.PreconditionException; 046import org.apache.oozie.command.RerunTransitionXCommand; 047import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 048import org.apache.oozie.coord.CoordELFunctions; 049import org.apache.oozie.coord.CoordUtils; 050import org.apache.oozie.dependency.URIHandler; 051import org.apache.oozie.dependency.URIHandler.Context; 052import org.apache.oozie.dependency.URIHandlerException; 053import org.apache.oozie.executor.jpa.BatchQueryExecutor; 054import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 055import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 056import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 057import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 058import org.apache.oozie.executor.jpa.JPAExecutorException; 059import org.apache.oozie.service.EventHandlerService; 060import org.apache.oozie.service.Services; 061import org.apache.oozie.service.URIHandlerService; 062import org.apache.oozie.sla.SLAOperations; 063import org.apache.oozie.sla.service.SLAService; 064import org.apache.oozie.util.InstrumentUtils; 065import org.apache.oozie.util.LogUtils; 066import org.apache.oozie.util.ParamChecker; 067import org.apache.oozie.util.StatusUtils; 068import org.apache.oozie.util.XConfiguration; 069import org.apache.oozie.util.XLog; 070import org.apache.oozie.util.XmlUtils; 071import org.apache.oozie.util.db.SLADbOperations; 072import org.jdom.Element; 073import org.jdom.JDOMException; 074 075/** 076 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup. 077 * <p> 078 * The "rerunType" can be set as {@link RestConstants#JOB_COORD_SCOPE_DATE} or {@link RestConstants#JOB_COORD_SCOPE_ACTION}. 079 * <p> 080 * The "refresh" is used to indicate if user wants to refresh an action's input and output events. 081 * <p> 082 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 083 */ 084@SuppressWarnings("deprecation") 085public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> { 086 087 public static final String RERUN_CONF = "rerunConf"; 088 private String rerunType; 089 private String scope; 090 private boolean refresh; 091 private boolean noCleanup; 092 private CoordinatorJobBean coordJob = null; 093 protected boolean prevPending; 094 private boolean failed; 095 private Configuration actionRunConf; 096 097 /** 098 * The constructor for class {@link CoordRerunXCommand} 099 * 100 * @param jobId the job id 101 * @param rerunType rerun type {@link RestConstants#JOB_COORD_SCOPE_DATE} or {@link RestConstants#JOB_COORD_SCOPE_ACTION} 102 * @param scope the rerun scope for given rerunType separated by "," 103 * @param refresh true if user wants to refresh input/output dataset urls 104 * @param noCleanup false if user wants to cleanup output events for given rerun actions 105 * @param failed true if user wants to rerun only failed nodes 106 * @param actionRunConf configuration values for actions 107 */ 108 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup, 109 boolean failed, Configuration actionRunConf) { 110 super("coord_rerun", "coord_rerun", 1); 111 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 112 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 113 this.scope = ParamChecker.notEmpty(scope, "scope"); 114 this.refresh = refresh; 115 this.noCleanup = noCleanup; 116 this.failed = failed; 117 this.actionRunConf = actionRunConf; 118 } 119 120 /** 121 * Check if all given actions are eligible to rerun. 122 * 123 * @param coordActions list of CoordinatorActionBean 124 * @return true if all actions are eligible to rerun 125 */ 126 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 127 ParamChecker.notNull(coordActions, "Coord actions to be rerun"); 128 boolean ret = false; 129 for (CoordinatorActionBean coordAction : coordActions) { 130 ret = true; 131 if (!coordAction.isTerminalStatus()) { 132 ret = false; 133 break; 134 } 135 } 136 return ret; 137 } 138 139 /** 140 * Cleanup output-events directories 141 * 142 * @param eAction coordinator action xml 143 */ 144 @SuppressWarnings("unchecked") 145 private void cleanupOutputEvents(Element eAction, Configuration coordJobConf, Map<String, Context> uriHandlerContextMap) 146 throws CommandException { 147 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 148 if (outputList != null) { 149 150 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 151 String nocleanup = data.getAttributeValue("nocleanup"); 152 if (data.getChild("uris", data.getNamespace()) != null 153 && (nocleanup == null || !nocleanup.equals("true"))) { 154 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 155 if (uris != null) { 156 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 157 try { 158 for (String uriStr : uriArr) { 159 URI uri = new URI(uriStr); 160 URIHandler handler = Services.get().get(URIHandlerService.class).getURIHandler(uri); 161 String schemeWithAuthority = uri.getScheme() + "://" + uri.getAuthority(); 162 if (!uriHandlerContextMap.containsKey(schemeWithAuthority)) { 163 Context context = handler.getContext(uri, coordJobConf, coordJob.getUser(), false); 164 uriHandlerContextMap.put(schemeWithAuthority, context); 165 } 166 handler.delete(uri, uriHandlerContextMap.get(schemeWithAuthority)); 167 LOG.info("Cleanup the output data " + uri.toString()); 168 } 169 } 170 catch (URISyntaxException e) { 171 throw new CommandException(ErrorCode.E0907, e.getMessage()); 172 } 173 catch (URIHandlerException e) { 174 throw new CommandException(ErrorCode.E0907, e.getMessage()); 175 } 176 } 177 } 178 } 179 180 } 181 else { 182 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 183 } 184 } 185 186 /** 187 * Refresh an action's input and ouput events. 188 * 189 * @param coordJob coordinator job bean 190 * @param coordAction coordinator action bean 191 * @throws Exception thrown if failed to materialize coordinator action 192 */ 193 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception { 194 Configuration jobConf = null; 195 try { 196 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 197 } 198 catch (IOException ioe) { 199 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 200 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); 201 } 202 String jobXml = coordJob.getJobXml(); 203 Element eJob = XmlUtils.parseXml(jobXml); 204 Date actualTime = new Date(); 205 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 206 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 207 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 208 + XmlUtils.prettyPrint(actionXml).toString()); 209 coordAction.setActionXml(actionXml); 210 } 211 212 /** 213 * Update an action into database table 214 * 215 * @param coordJob coordinator job bean 216 * @param coordAction coordinator action bean 217 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event 218 */ 219 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) 220 throws Exception { 221 LOG.debug("updateAction for actionId=" + coordAction.getId()); 222 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) { 223 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId()); 224 coordAction.setCreatedTime(new Date()); 225 } 226 coordAction.setStatus(CoordinatorAction.Status.WAITING); 227 if(!failed) { 228 coordAction.setExternalId(null); 229 } 230 coordAction.setExternalStatus(null); 231 coordAction.setRerunTime(new Date()); 232 coordAction.setLastModifiedTime(new Date()); 233 coordAction.setErrorCode(""); 234 coordAction.setErrorMessage(""); 235 236 // Pushing the configuration which passed through rerun. 237 if(actionRunConf != null && actionRunConf.size() > 0) { 238 Configuration createdConf = null; 239 if(coordAction.getCreatedConf() != null ) { 240 createdConf = new XConfiguration(new StringReader(coordAction.getCreatedConf())); 241 } else { 242 createdConf = new Configuration(); 243 } 244 createdConf.set(RERUN_CONF, XmlUtils.prettyPrint(actionRunConf).toString()); 245 coordAction.setCreatedConf(XmlUtils.prettyPrint(createdConf).toString()); 246 } 247 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, coordAction)); 248 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup()); 249 } 250 251 /** 252 * Create SLA RegistrationEvent 253 * 254 * @param actionXml action xml 255 * @param actionBean coordinator action bean 256 * @param user user name 257 * @param group group name 258 * @throws Exception thrown if unable to write sla registration event 259 */ 260 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group) 261 throws Exception { 262 Element eAction = XmlUtils.parseXml(actionXml); 263 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 264 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), 265 SlaAppType.COORDINATOR_ACTION, user, group, LOG); 266 if(slaEvent != null) { 267 insertList.add(slaEvent); 268 } 269 } 270 271 /* (non-Javadoc) 272 * @see org.apache.oozie.command.XCommand#getEntityKey() 273 */ 274 @Override 275 public String getEntityKey() { 276 return jobId; 277 } 278 279 /* (non-Javadoc) 280 * @see org.apache.oozie.command.XCommand#isLockRequired() 281 */ 282 @Override 283 protected boolean isLockRequired() { 284 return true; 285 } 286 287 /* (non-Javadoc) 288 * @see org.apache.oozie.command.XCommand#loadState() 289 */ 290 @Override 291 protected void loadState() throws CommandException { 292 try { 293 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); 294 prevPending = coordJob.isPending(); 295 } 296 catch (JPAExecutorException je) { 297 throw new CommandException(je); 298 } 299 LogUtils.setLogInfo(coordJob); 300 } 301 302 /* (non-Javadoc) 303 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 304 */ 305 @Override 306 protected void verifyPrecondition() throws CommandException, PreconditionException { 307 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus()); 308 309 // no actions have been created for PREP job 310 if (coordJob.getStatus() == CoordinatorJob.Status.PREP || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) { 311 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 312 // Call the parent so the pending flag is reset and state transition 313 // of bundle can happen 314 if (coordJob.getBundleId() != null) { 315 bundleStatusUpdate.call(); 316 } 317 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 318 throw new CommandException(ErrorCode.E1018, 319 "coordinator job is PREP so no actions are materialized to rerun!"); 320 } 321 else { 322 throw new CommandException(ErrorCode.E1018, 323 "coordinator job is IGNORED, please change it to RUNNING before rerunning actions"); 324 } 325 } 326 } 327 328 @Override 329 protected void eagerLoadState() throws CommandException { 330 try { 331 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); 332 } 333 catch (JPAExecutorException e) { 334 throw new CommandException(e); 335 } 336 } 337 338 @Override 339 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 340 verifyPrecondition(); 341 } 342 343 @Override 344 public void rerunChildren() throws CommandException { 345 boolean isError = false; 346 try { 347 CoordinatorActionInfo coordInfo = null; 348 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 349 List<CoordinatorActionBean> coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false); 350 if (checkAllActionsRunnable(coordActions)) { 351 Map<String, Context> uriHandlerContextMap = new HashMap<String, Context>(); 352 Configuration coordJobConf = null; 353 try { 354 coordJobConf = new XConfiguration(new StringReader(coordJob.getConf())); 355 } 356 catch (IOException e) { 357 throw new CommandException(ErrorCode.E0907, "failed to read coord job conf to clean up output data"); 358 } 359 try { 360 for (CoordinatorActionBean coordAction : coordActions) { 361 String actionXml = coordAction.getActionXml(); 362 // Cleanup activity should not run when failed option has been provided 363 if (!noCleanup && !failed) { 364 Element eAction = XmlUtils.parseXml(actionXml); 365 cleanupOutputEvents(eAction, coordJobConf, uriHandlerContextMap); 366 } 367 if (refresh) { 368 refreshAction(coordJob, coordAction); 369 } 370 updateAction(coordJob, coordAction); 371 if (SLAService.isEnabled()) { 372 SLAOperations.updateRegistrationEvent(coordAction.getId()); 373 } 374 queue(new CoordActionNotificationXCommand(coordAction), 100); 375 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); 376 if (coordAction.getPushMissingDependencies() != null) { 377 queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100); 378 } 379 } 380 } 381 finally { 382 Iterator<Entry<String, Context>> itr = uriHandlerContextMap.entrySet().iterator(); 383 while (itr.hasNext()) { 384 Entry<String, Context> entry = itr.next(); 385 entry.getValue().destroy(); 386 itr.remove(); 387 } 388 } 389 } 390 else { 391 isError = true; 392 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 393 } 394 coordInfo = new CoordinatorActionInfo(coordActions); 395 396 ret = coordInfo; 397 } 398 catch (XException xex) { 399 isError = true; 400 throw new CommandException(xex); 401 } 402 catch (JDOMException jex) { 403 isError = true; 404 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 405 } 406 catch (Exception ex) { 407 isError = true; 408 throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex); 409 } 410 finally{ 411 if(isError){ 412 transitToPrevious(); 413 } 414 } 415 } 416 417 /* 418 * (non-Javadoc) 419 * @see org.apache.oozie.command.TransitionXCommand#getJob() 420 */ 421 @Override 422 public Job getJob() { 423 return coordJob; 424 } 425 426 @Override 427 public void notifyParent() throws CommandException { 428 //update bundle action 429 if (getPrevStatus() != null && coordJob.getBundleId() != null) { 430 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus()); 431 bundleStatusUpdate.call(); 432 } 433 } 434 435 @Override 436 public void updateJob() { 437 if (getPrevStatus()!= null){ 438 Job.Status coordJobStatus = getPrevStatus(); 439 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 440 coordJob.setStatus(coordJobStatus); 441 } 442 if (prevPending) { 443 coordJob.setPending(); 444 } else { 445 coordJob.resetPending(); 446 } 447 } 448 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob)); 449 } 450 451 /* (non-Javadoc) 452 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites() 453 */ 454 @Override 455 public void performWrites() throws CommandException { 456 try { 457 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 458 if (EventHandlerService.isEnabled()) { 459 generateEvents(coordJob, null); 460 } 461 } 462 catch (JPAExecutorException e) { 463 throw new CommandException(e); 464 } 465 } 466 467 /* (non-Javadoc) 468 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 469 */ 470 @Override 471 public XLog getLog() { 472 return LOG; 473 } 474 475 @Override 476 public final void transitToNext() { 477 prevStatus = coordJob.getStatus(); 478 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED 479 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) { 480 coordJob.setStatus(Job.Status.RUNNING); 481 } 482 else { 483 // Check for backward compatibility for Oozie versions (3.2 and before) 484 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 485 // PAUSEDWITHERROR is not supported 486 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 487 } 488 // used for backward support of coordinator 0.1 schema 489 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus)); 490 coordJob.setPending(); 491 } 492 493 private final void transitToPrevious() throws CommandException { 494 coordJob.setStatus(getPrevStatus()); 495 if (!prevPending) { 496 coordJob.resetPending(); 497 } 498 else { 499 coordJob.setPending(); 500 } 501 } 502}