001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.io.IOException; 021import java.io.StringReader; 022import java.util.Date; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.oozie.CoordinatorActionBean; 027import org.apache.oozie.CoordinatorActionInfo; 028import org.apache.oozie.CoordinatorJobBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.SLAEventBean; 031import org.apache.oozie.XException; 032import org.apache.oozie.action.ActionExecutorException; 033import org.apache.oozie.action.hadoop.FsActionExecutor; 034import org.apache.oozie.client.CoordinatorAction; 035import org.apache.oozie.client.CoordinatorJob; 036import org.apache.oozie.client.Job; 037import org.apache.oozie.client.SLAEvent.SlaAppType; 038import org.apache.oozie.client.rest.RestConstants; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.command.PreconditionException; 041import org.apache.oozie.command.RerunTransitionXCommand; 042import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 043import org.apache.oozie.coord.CoordELFunctions; 044import org.apache.oozie.coord.CoordUtils; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 047import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 048import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 049import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 050import org.apache.oozie.executor.jpa.JPAExecutorException; 051import org.apache.oozie.service.EventHandlerService; 052import org.apache.oozie.sla.SLAOperations; 053import org.apache.oozie.sla.service.SLAService; 054import org.apache.oozie.util.InstrumentUtils; 055import org.apache.oozie.util.LogUtils; 056import org.apache.oozie.util.ParamChecker; 057import org.apache.oozie.util.StatusUtils; 058import org.apache.oozie.util.XConfiguration; 059import org.apache.oozie.util.XLog; 060import org.apache.oozie.util.XmlUtils; 061import org.apache.oozie.util.db.SLADbOperations; 062import org.jdom.Element; 063import org.jdom.JDOMException; 064 065/** 066 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup. 067 * <p/> 068 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or 069 * {@link RestConstants.JOB_COORD_RERUN_ACTION}. 070 * <p/> 071 * The "refresh" is used to indicate if user wants to refresh an action's input and output events. 072 * <p/> 073 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 074 */ 075@SuppressWarnings("deprecation") 076public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> { 077 078 private String rerunType; 079 private String scope; 080 private boolean refresh; 081 private boolean noCleanup; 082 private CoordinatorJobBean coordJob = null; 083 protected boolean prevPending; 084 085 /** 086 * The constructor for class {@link CoordRerunXCommand} 087 * 088 * @param jobId the job id 089 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION} 090 * @param scope the rerun scope for given rerunType separated by "," 091 * @param refresh true if user wants to refresh input/output dataset urls 092 * @param noCleanup false if user wants to cleanup output events for given rerun actions 093 */ 094 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 095 super("coord_rerun", "coord_rerun", 1); 096 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 097 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 098 this.scope = ParamChecker.notEmpty(scope, "scope"); 099 this.refresh = refresh; 100 this.noCleanup = noCleanup; 101 } 102 103 /** 104 * Check if all given actions are eligible to rerun. 105 * 106 * @param actions list of CoordinatorActionBean 107 * @return true if all actions are eligible to rerun 108 */ 109 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 110 ParamChecker.notNull(coordActions, "Coord actions to be rerun"); 111 boolean ret = false; 112 for (CoordinatorActionBean coordAction : coordActions) { 113 ret = true; 114 if (!coordAction.isTerminalStatus()) { 115 ret = false; 116 break; 117 } 118 } 119 return ret; 120 } 121 122 /** 123 * Cleanup output-events directories 124 * 125 * @param eAction coordinator action xml 126 * @param user user name 127 * @param group group name 128 */ 129 @SuppressWarnings("unchecked") 130 private void cleanupOutputEvents(Element eAction, String user, String group) { 131 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 132 if (outputList != null) { 133 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 134 if (data.getChild("uris", data.getNamespace()) != null) { 135 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 136 if (uris != null) { 137 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 138 FsActionExecutor fsAe = new FsActionExecutor(); 139 for (String uri : uriArr) { 140 Path path = new Path(uri); 141 try { 142 fsAe.delete(user, group, path); 143 LOG.debug("Cleanup the output dir " + path); 144 } 145 catch (ActionExecutorException ae) { 146 LOG.warn("Failed to cleanup the output dir " + uri, ae); 147 } 148 } 149 } 150 151 } 152 } 153 } 154 else { 155 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 156 } 157 } 158 159 /** 160 * Refresh an action's input and ouput events. 161 * 162 * @param coordJob coordinator job bean 163 * @param coordAction coordinator action bean 164 * @throws Exception thrown if failed to materialize coordinator action 165 */ 166 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception { 167 Configuration jobConf = null; 168 try { 169 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 170 } 171 catch (IOException ioe) { 172 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 173 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); 174 } 175 String jobXml = coordJob.getJobXml(); 176 Element eJob = XmlUtils.parseXml(jobXml); 177 Date actualTime = new Date(); 178 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 179 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 180 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 181 + XmlUtils.prettyPrint(actionXml).toString()); 182 coordAction.setActionXml(actionXml); 183 } 184 185 /** 186 * Update an action into database table 187 * 188 * @param coordJob coordinator job bean 189 * @param coordAction coordinator action bean 190 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event 191 */ 192 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) 193 throws Exception { 194 LOG.debug("updateAction for actionId=" + coordAction.getId()); 195 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) { 196 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId()); 197 coordAction.setCreatedTime(new Date()); 198 } 199 coordAction.setStatus(CoordinatorAction.Status.WAITING); 200 coordAction.setExternalId(null); 201 coordAction.setExternalStatus(null); 202 coordAction.setRerunTime(new Date()); 203 coordAction.setLastModifiedTime(new Date()); 204 coordAction.setErrorCode(""); 205 coordAction.setErrorMessage(""); 206 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, coordAction)); 207 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup()); 208 } 209 210 /** 211 * Create SLA RegistrationEvent 212 * 213 * @param actionXml action xml 214 * @param actionBean coordinator action bean 215 * @param user user name 216 * @param group group name 217 * @throws Exception thrown if unable to write sla registration event 218 */ 219 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group) 220 throws Exception { 221 Element eAction = XmlUtils.parseXml(actionXml); 222 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 223 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), 224 SlaAppType.COORDINATOR_ACTION, user, group, LOG); 225 if(slaEvent != null) { 226 insertList.add(slaEvent); 227 } 228 } 229 230 /* (non-Javadoc) 231 * @see org.apache.oozie.command.XCommand#getEntityKey() 232 */ 233 @Override 234 public String getEntityKey() { 235 return jobId; 236 } 237 238 /* (non-Javadoc) 239 * @see org.apache.oozie.command.XCommand#isLockRequired() 240 */ 241 @Override 242 protected boolean isLockRequired() { 243 return true; 244 } 245 246 /* (non-Javadoc) 247 * @see org.apache.oozie.command.XCommand#loadState() 248 */ 249 @Override 250 protected void loadState() throws CommandException { 251 try { 252 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); 253 prevPending = coordJob.isPending(); 254 } 255 catch (JPAExecutorException je) { 256 throw new CommandException(je); 257 } 258 LogUtils.setLogInfo(coordJob, logInfo); 259 } 260 261 /* (non-Javadoc) 262 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 263 */ 264 @Override 265 protected void verifyPrecondition() throws CommandException, PreconditionException { 266 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus()); 267 268 // no actions have been created for PREP job 269 if (coordJob.getStatus() == CoordinatorJob.Status.PREP || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) { 270 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 271 // Call the parent so the pending flag is reset and state transition 272 // of bundle can happen 273 if (coordJob.getBundleId() != null) { 274 bundleStatusUpdate.call(); 275 } 276 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 277 throw new CommandException(ErrorCode.E1018, 278 "coordinator job is PREP so no actions are materialized to rerun!"); 279 } 280 else { 281 throw new CommandException(ErrorCode.E1018, 282 "coordinator job is IGNORED, please change it to RUNNING before rerunning actions"); 283 } 284 } 285 } 286 287 @Override 288 protected void eagerLoadState() throws CommandException { 289 try { 290 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); 291 } 292 catch (JPAExecutorException e) { 293 throw new CommandException(e); 294 } 295 } 296 297 @Override 298 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 299 verifyPrecondition(); 300 } 301 302 @Override 303 public void rerunChildren() throws CommandException { 304 boolean isError = false; 305 try { 306 CoordinatorActionInfo coordInfo = null; 307 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 308 List<CoordinatorActionBean> coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false); 309 if (checkAllActionsRunnable(coordActions)) { 310 for (CoordinatorActionBean coordAction : coordActions) { 311 String actionXml = coordAction.getActionXml(); 312 if (!noCleanup) { 313 Element eAction = XmlUtils.parseXml(actionXml); 314 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup()); 315 } 316 if (refresh) { 317 refreshAction(coordJob, coordAction); 318 } 319 updateAction(coordJob, coordAction); 320 if (SLAService.isEnabled()) { 321 SLAOperations.updateRegistrationEvent(coordAction.getId()); 322 } 323 queue(new CoordActionNotificationXCommand(coordAction), 100); 324 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); 325 } 326 } 327 else { 328 isError = true; 329 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 330 } 331 coordInfo = new CoordinatorActionInfo(coordActions); 332 333 ret = coordInfo; 334 } 335 catch (XException xex) { 336 isError = true; 337 throw new CommandException(xex); 338 } 339 catch (JDOMException jex) { 340 isError = true; 341 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 342 } 343 catch (Exception ex) { 344 isError = true; 345 throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex); 346 } 347 finally{ 348 if(isError){ 349 transitToPrevious(); 350 } 351 } 352 } 353 354 /* 355 * (non-Javadoc) 356 * @see org.apache.oozie.command.TransitionXCommand#getJob() 357 */ 358 @Override 359 public Job getJob() { 360 return coordJob; 361 } 362 363 @Override 364 public void notifyParent() throws CommandException { 365 //update bundle action 366 if (getPrevStatus() != null && coordJob.getBundleId() != null) { 367 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus()); 368 bundleStatusUpdate.call(); 369 } 370 } 371 372 @Override 373 public void updateJob() { 374 if (getPrevStatus()!= null){ 375 Job.Status coordJobStatus = getPrevStatus(); 376 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 377 coordJob.setStatus(coordJobStatus); 378 } 379 if (prevPending) { 380 coordJob.setPending(); 381 } else { 382 coordJob.resetPending(); 383 } 384 } 385 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob)); 386 } 387 388 /* (non-Javadoc) 389 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites() 390 */ 391 @Override 392 public void performWrites() throws CommandException { 393 try { 394 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 395 if (EventHandlerService.isEnabled()) { 396 generateEvents(coordJob, null); 397 } 398 } 399 catch (JPAExecutorException e) { 400 throw new CommandException(e); 401 } 402 } 403 404 /* (non-Javadoc) 405 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 406 */ 407 @Override 408 public XLog getLog() { 409 return LOG; 410 } 411 412 @Override 413 public final void transitToNext() { 414 prevStatus = coordJob.getStatus(); 415 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED 416 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) { 417 coordJob.setStatus(Job.Status.RUNNING); 418 } 419 else { 420 // Check for backward compatibility for Oozie versions (3.2 and before) 421 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 422 // PAUSEDWITHERROR is not supported 423 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 424 } 425 // used for backward support of coordinator 0.1 schema 426 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus)); 427 coordJob.setPending(); 428 } 429 430 private final void transitToPrevious() throws CommandException { 431 coordJob.setStatus(getPrevStatus()); 432 if (!prevPending) { 433 coordJob.resetPending(); 434 } 435 else { 436 coordJob.setPending(); 437 } 438 } 439}