001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.Path; 026 import org.apache.oozie.CoordinatorActionBean; 027 import org.apache.oozie.CoordinatorActionInfo; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.SLAEventBean; 031 import org.apache.oozie.XException; 032 import org.apache.oozie.action.ActionExecutorException; 033 import org.apache.oozie.action.hadoop.FsActionExecutor; 034 import org.apache.oozie.client.CoordinatorAction; 035 import org.apache.oozie.client.CoordinatorJob; 036 import org.apache.oozie.client.Job; 037 import org.apache.oozie.client.SLAEvent.SlaAppType; 038 import org.apache.oozie.client.rest.RestConstants; 039 import org.apache.oozie.command.CommandException; 040 import org.apache.oozie.command.PreconditionException; 041 import org.apache.oozie.command.RerunTransitionXCommand; 042 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 043 import org.apache.oozie.coord.CoordELFunctions; 044 import org.apache.oozie.coord.CoordUtils; 045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.JPAExecutorException; 048 import org.apache.oozie.service.EventHandlerService; 049 import org.apache.oozie.service.JPAService; 050 import org.apache.oozie.service.Services; 051 import org.apache.oozie.sla.SLAOperations; 052 import org.apache.oozie.sla.service.SLAService; 053 import org.apache.oozie.util.InstrumentUtils; 054 import org.apache.oozie.util.LogUtils; 055 import org.apache.oozie.util.ParamChecker; 056 import org.apache.oozie.util.StatusUtils; 057 import org.apache.oozie.util.XConfiguration; 058 import org.apache.oozie.util.XLog; 059 import org.apache.oozie.util.XmlUtils; 060 import org.apache.oozie.util.db.SLADbOperations; 061 import org.jdom.Element; 062 import org.jdom.JDOMException; 063 064 /** 065 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup. 066 * <p/> 067 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or 068 * {@link RestConstants.JOB_COORD_RERUN_ACTION}. 069 * <p/> 070 * The "refresh" is used to indicate if user wants to refresh an action's input and output events. 071 * <p/> 072 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 073 */ 074 @SuppressWarnings("deprecation") 075 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> { 076 077 private String rerunType; 078 private String scope; 079 private boolean refresh; 080 private boolean noCleanup; 081 private CoordinatorJobBean coordJob = null; 082 private JPAService jpaService = null; 083 protected boolean prevPending; 084 085 /** 086 * The constructor for class {@link CoordRerunXCommand} 087 * 088 * @param jobId the job id 089 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION} 090 * @param scope the rerun scope for given rerunType separated by "," 091 * @param refresh true if user wants to refresh input/output dataset urls 092 * @param noCleanup false if user wants to cleanup output events for given rerun actions 093 */ 094 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 095 super("coord_rerun", "coord_rerun", 1); 096 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 097 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 098 this.scope = ParamChecker.notEmpty(scope, "scope"); 099 this.refresh = refresh; 100 this.noCleanup = noCleanup; 101 } 102 103 /** 104 * Check if all given actions are eligible to rerun. 105 * 106 * @param actions list of CoordinatorActionBean 107 * @return true if all actions are eligible to rerun 108 */ 109 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 110 ParamChecker.notNull(coordActions, "Coord actions to be rerun"); 111 boolean ret = false; 112 for (CoordinatorActionBean coordAction : coordActions) { 113 ret = true; 114 if (!coordAction.isTerminalStatus()) { 115 ret = false; 116 break; 117 } 118 } 119 return ret; 120 } 121 122 /** 123 * Get the list of actions for a given coordinator job 124 * @param rerunType the rerun type (date, action) 125 * @param jobId the coordinator job id 126 * @param scope the date scope or action id scope 127 * @return the list of Coordinator actions 128 * @throws CommandException 129 */ 130 public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{ 131 List<CoordinatorActionBean> coordActions = null; 132 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) { 133 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope); 134 } 135 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) { 136 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope); 137 } 138 return coordActions; 139 } 140 141 /** 142 * Cleanup output-events directories 143 * 144 * @param eAction coordinator action xml 145 * @param user user name 146 * @param group group name 147 */ 148 @SuppressWarnings("unchecked") 149 private void cleanupOutputEvents(Element eAction, String user, String group) { 150 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 151 if (outputList != null) { 152 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 153 if (data.getChild("uris", data.getNamespace()) != null) { 154 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 155 if (uris != null) { 156 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 157 FsActionExecutor fsAe = new FsActionExecutor(); 158 for (String uri : uriArr) { 159 Path path = new Path(uri); 160 try { 161 fsAe.delete(user, group, path); 162 LOG.debug("Cleanup the output dir " + path); 163 } 164 catch (ActionExecutorException ae) { 165 LOG.warn("Failed to cleanup the output dir " + uri, ae); 166 } 167 } 168 } 169 170 } 171 } 172 } 173 else { 174 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 175 } 176 } 177 178 /** 179 * Refresh an action's input and ouput events. 180 * 181 * @param coordJob coordinator job bean 182 * @param coordAction coordinator action bean 183 * @throws Exception thrown if failed to materialize coordinator action 184 */ 185 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception { 186 Configuration jobConf = null; 187 try { 188 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 189 } 190 catch (IOException ioe) { 191 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 192 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); 193 } 194 String jobXml = coordJob.getJobXml(); 195 Element eJob = XmlUtils.parseXml(jobXml); 196 Date actualTime = new Date(); 197 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 198 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 199 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 200 + XmlUtils.prettyPrint(actionXml).toString()); 201 coordAction.setActionXml(actionXml); 202 } 203 204 /** 205 * Update an action into database table 206 * 207 * @param coordJob coordinator job bean 208 * @param coordAction coordinator action bean 209 * @param actionXml coordinator action xml 210 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event 211 */ 212 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml) 213 throws Exception { 214 LOG.debug("updateAction for actionId=" + coordAction.getId()); 215 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) { 216 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId()); 217 coordAction.setCreatedTime(new Date()); 218 } 219 coordAction.setStatus(CoordinatorAction.Status.WAITING); 220 coordAction.setExternalId(""); 221 coordAction.setExternalStatus(""); 222 coordAction.setRerunTime(new Date()); 223 coordAction.setLastModifiedTime(new Date()); 224 updateList.add(coordAction); 225 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup()); 226 } 227 228 /** 229 * Create SLA RegistrationEvent 230 * 231 * @param actionXml action xml 232 * @param actionBean coordinator action bean 233 * @param user user name 234 * @param group group name 235 * @throws Exception thrown if unable to write sla registration event 236 */ 237 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group) 238 throws Exception { 239 Element eAction = XmlUtils.parseXml(actionXml); 240 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 241 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), 242 SlaAppType.COORDINATOR_ACTION, user, group, LOG); 243 if(slaEvent != null) { 244 insertList.add(slaEvent); 245 } 246 } 247 248 /* (non-Javadoc) 249 * @see org.apache.oozie.command.XCommand#getEntityKey() 250 */ 251 @Override 252 public String getEntityKey() { 253 return jobId; 254 } 255 256 /* (non-Javadoc) 257 * @see org.apache.oozie.command.XCommand#isLockRequired() 258 */ 259 @Override 260 protected boolean isLockRequired() { 261 return true; 262 } 263 264 /* (non-Javadoc) 265 * @see org.apache.oozie.command.XCommand#loadState() 266 */ 267 @Override 268 protected void loadState() throws CommandException { 269 jpaService = Services.get().get(JPAService.class); 270 if (jpaService == null) { 271 throw new CommandException(ErrorCode.E0610); 272 } 273 try { 274 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 275 prevPending = coordJob.isPending(); 276 } 277 catch (JPAExecutorException je) { 278 throw new CommandException(je); 279 } 280 LogUtils.setLogInfo(coordJob, logInfo); 281 } 282 283 /* (non-Javadoc) 284 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 285 */ 286 @Override 287 protected void verifyPrecondition() throws CommandException, PreconditionException { 288 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus()); 289 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED 290 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 291 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 292 // Call the parent so the pending flag is reset and state transition 293 // of bundle can happen 294 if (coordJob.getBundleId() != null) { 295 bundleStatusUpdate.call(); 296 } 297 throw new CommandException(ErrorCode.E1018, 298 "coordinator job is killed or failed so all actions are not eligible to rerun!"); 299 } 300 301 // no actioins have been created for PREP job 302 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 303 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 304 // Call the parent so the pending flag is reset and state transition 305 // of bundle can happen 306 if (coordJob.getBundleId() != null) { 307 bundleStatusUpdate.call(); 308 } 309 throw new CommandException(ErrorCode.E1018, 310 "coordinator job is PREP so no actions are materialized to rerun!"); 311 } 312 } 313 314 @Override 315 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 316 verifyPrecondition(); 317 } 318 319 @Override 320 public void rerunChildren() throws CommandException { 321 boolean isError = false; 322 try { 323 CoordinatorActionInfo coordInfo = null; 324 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 325 List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope); 326 if (checkAllActionsRunnable(coordActions)) { 327 for (CoordinatorActionBean coordAction : coordActions) { 328 String actionXml = coordAction.getActionXml(); 329 if (!noCleanup) { 330 Element eAction = XmlUtils.parseXml(actionXml); 331 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup()); 332 } 333 if (refresh) { 334 refreshAction(coordJob, coordAction); 335 } 336 updateAction(coordJob, coordAction, actionXml); 337 if (SLAService.isEnabled()) { 338 SLAOperations.updateRegistrationEvent(coordAction.getId()); 339 } 340 queue(new CoordActionNotificationXCommand(coordAction), 100); 341 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); 342 } 343 } 344 else { 345 isError = true; 346 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 347 } 348 coordInfo = new CoordinatorActionInfo(coordActions); 349 350 ret = coordInfo; 351 } 352 catch (XException xex) { 353 isError = true; 354 throw new CommandException(xex); 355 } 356 catch (JDOMException jex) { 357 isError = true; 358 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 359 } 360 catch (Exception ex) { 361 isError = true; 362 throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex); 363 } 364 finally{ 365 if(isError){ 366 transitToPrevious(); 367 } 368 } 369 } 370 371 /* 372 * (non-Javadoc) 373 * @see org.apache.oozie.command.TransitionXCommand#getJob() 374 */ 375 @Override 376 public Job getJob() { 377 return coordJob; 378 } 379 380 @Override 381 public void notifyParent() throws CommandException { 382 //update bundle action 383 if (getPrevStatus() != null && coordJob.getBundleId() != null) { 384 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus()); 385 bundleStatusUpdate.call(); 386 } 387 } 388 389 @Override 390 public void updateJob() { 391 if (getPrevStatus()!= null){ 392 Job.Status coordJobStatus = getPrevStatus(); 393 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 394 coordJob.setStatus(coordJobStatus); 395 } 396 if (prevPending) { 397 coordJob.setPending(); 398 } else { 399 coordJob.resetPending(); 400 } 401 } 402 updateList.add(coordJob); 403 } 404 405 /* (non-Javadoc) 406 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites() 407 */ 408 @Override 409 public void performWrites() throws CommandException { 410 try { 411 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 412 if (EventHandlerService.isEnabled()) { 413 generateEvents(coordJob); 414 } 415 } 416 catch (JPAExecutorException e) { 417 throw new CommandException(e); 418 } 419 } 420 421 /* (non-Javadoc) 422 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 423 */ 424 @Override 425 public XLog getLog() { 426 return LOG; 427 } 428 429 @Override 430 public final void transitToNext() { 431 prevStatus = coordJob.getStatus(); 432 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED 433 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) { 434 coordJob.setStatus(Job.Status.RUNNING); 435 } 436 else { 437 // Check for backward compatibility for Oozie versions (3.2 and before) 438 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 439 // PAUSEDWITHERROR is not supported 440 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 441 } 442 // used for backward support of coordinator 0.1 schema 443 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus)); 444 coordJob.setPending(); 445 } 446 447 private final void transitToPrevious() throws CommandException { 448 coordJob.setStatus(getPrevStatus()); 449 if (!prevPending) { 450 coordJob.resetPending(); 451 } 452 else { 453 coordJob.setPending(); 454 } 455 } 456 }