001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.Path; 026 import org.apache.oozie.CoordinatorActionBean; 027 import org.apache.oozie.CoordinatorActionInfo; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.SLAEventBean; 031 import org.apache.oozie.XException; 032 import org.apache.oozie.action.ActionExecutorException; 033 import org.apache.oozie.action.hadoop.FsActionExecutor; 034 import org.apache.oozie.client.CoordinatorAction; 035 import org.apache.oozie.client.CoordinatorJob; 036 import org.apache.oozie.client.Job; 037 import org.apache.oozie.client.SLAEvent.SlaAppType; 038 import org.apache.oozie.client.rest.RestConstants; 039 import org.apache.oozie.command.CommandException; 040 import org.apache.oozie.command.PreconditionException; 041 import org.apache.oozie.command.RerunTransitionXCommand; 042 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 043 import org.apache.oozie.coord.CoordELFunctions; 044 import org.apache.oozie.coord.CoordUtils; 045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.JPAExecutorException; 048 import org.apache.oozie.service.JPAService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.util.InstrumentUtils; 051 import org.apache.oozie.util.LogUtils; 052 import org.apache.oozie.util.ParamChecker; 053 import org.apache.oozie.util.StatusUtils; 054 import org.apache.oozie.util.XConfiguration; 055 import org.apache.oozie.util.XLog; 056 import org.apache.oozie.util.XmlUtils; 057 import org.apache.oozie.util.db.SLADbOperations; 058 import org.jdom.Element; 059 import org.jdom.JDOMException; 060 061 /** 062 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup. 063 * <p/> 064 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or 065 * {@link RestConstants.JOB_COORD_RERUN_ACTION}. 066 * <p/> 067 * The "refresh" is used to indicate if user wants to refresh an action's input and output events. 068 * <p/> 069 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 070 */ 071 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> { 072 073 private String rerunType; 074 private String scope; 075 private boolean refresh; 076 private boolean noCleanup; 077 private CoordinatorJobBean coordJob = null; 078 private JPAService jpaService = null; 079 protected boolean prevPending; 080 081 /** 082 * The constructor for class {@link CoordRerunXCommand} 083 * 084 * @param jobId the job id 085 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION} 086 * @param scope the rerun scope for given rerunType separated by "," 087 * @param refresh true if user wants to refresh input/output dataset urls 088 * @param noCleanup false if user wants to cleanup output events for given rerun actions 089 */ 090 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 091 super("coord_rerun", "coord_rerun", 1); 092 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 093 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 094 this.scope = ParamChecker.notEmpty(scope, "scope"); 095 this.refresh = refresh; 096 this.noCleanup = noCleanup; 097 } 098 099 /** 100 * Check if all given actions are eligible to rerun. 101 * 102 * @param actions list of CoordinatorActionBean 103 * @return true if all actions are eligible to rerun 104 */ 105 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 106 ParamChecker.notNull(coordActions, "Coord actions to be rerun"); 107 boolean ret = false; 108 for (CoordinatorActionBean coordAction : coordActions) { 109 ret = true; 110 if (!coordAction.isTerminalStatus()) { 111 ret = false; 112 break; 113 } 114 } 115 return ret; 116 } 117 118 /** 119 * Get the list of actions for a given coordinator job 120 * @param rerunType the rerun type (date, action) 121 * @param jobId the coordinator job id 122 * @param scope the date scope or action id scope 123 * @return the list of Coordinator actions 124 * @throws CommandException 125 */ 126 public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{ 127 List<CoordinatorActionBean> coordActions = null; 128 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) { 129 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope); 130 } 131 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) { 132 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope); 133 } 134 return coordActions; 135 } 136 137 /** 138 * Cleanup output-events directories 139 * 140 * @param eAction coordinator action xml 141 * @param user user name 142 * @param group group name 143 */ 144 @SuppressWarnings("unchecked") 145 private void cleanupOutputEvents(Element eAction, String user, String group) { 146 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 147 if (outputList != null) { 148 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 149 if (data.getChild("uris", data.getNamespace()) != null) { 150 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 151 if (uris != null) { 152 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 153 FsActionExecutor fsAe = new FsActionExecutor(); 154 for (String uri : uriArr) { 155 Path path = new Path(uri); 156 try { 157 fsAe.delete(user, group, path); 158 LOG.debug("Cleanup the output dir " + path); 159 } 160 catch (ActionExecutorException ae) { 161 LOG.warn("Failed to cleanup the output dir " + uri, ae); 162 } 163 } 164 } 165 166 } 167 } 168 } 169 else { 170 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 171 } 172 } 173 174 /** 175 * Refresh an action's input and ouput events. 176 * 177 * @param coordJob coordinator job bean 178 * @param coordAction coordinator action bean 179 * @throws Exception thrown if failed to materialize coordinator action 180 */ 181 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception { 182 Configuration jobConf = null; 183 try { 184 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 185 } 186 catch (IOException ioe) { 187 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 188 throw new CommandException(ErrorCode.E1005, ioe); 189 } 190 String jobXml = coordJob.getJobXml(); 191 Element eJob = XmlUtils.parseXml(jobXml); 192 Date actualTime = new Date(); 193 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 194 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 195 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 196 + XmlUtils.prettyPrint(actionXml).toString()); 197 coordAction.setActionXml(actionXml); 198 } 199 200 /** 201 * Update an action into database table 202 * 203 * @param coordJob coordinator job bean 204 * @param coordAction coordinator action bean 205 * @param actionXml coordinator action xml 206 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event 207 */ 208 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml) 209 throws Exception { 210 LOG.debug("updateAction for actionId=" + coordAction.getId()); 211 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) { 212 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId()); 213 coordAction.setCreatedTime(new Date()); 214 } 215 coordAction.setStatus(CoordinatorAction.Status.WAITING); 216 coordAction.setExternalId(""); 217 coordAction.setExternalStatus(""); 218 coordAction.setRerunTime(new Date()); 219 coordAction.setLastModifiedTime(new Date()); 220 updateList.add(coordAction); 221 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup()); 222 } 223 224 /** 225 * Create SLA RegistrationEvent 226 * 227 * @param actionXml action xml 228 * @param actionBean coordinator action bean 229 * @param user user name 230 * @param group group name 231 * @throws Exception thrown if unable to write sla registration event 232 */ 233 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group) 234 throws Exception { 235 Element eAction = XmlUtils.parseXml(actionXml); 236 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 237 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), 238 SlaAppType.COORDINATOR_ACTION, user, group, LOG); 239 if(slaEvent != null) { 240 insertList.add(slaEvent); 241 } 242 } 243 244 /* (non-Javadoc) 245 * @see org.apache.oozie.command.XCommand#getEntityKey() 246 */ 247 @Override 248 public String getEntityKey() { 249 return jobId; 250 } 251 252 /* (non-Javadoc) 253 * @see org.apache.oozie.command.XCommand#isLockRequired() 254 */ 255 @Override 256 protected boolean isLockRequired() { 257 return true; 258 } 259 260 /* (non-Javadoc) 261 * @see org.apache.oozie.command.XCommand#loadState() 262 */ 263 @Override 264 protected void loadState() throws CommandException { 265 jpaService = Services.get().get(JPAService.class); 266 if (jpaService == null) { 267 throw new CommandException(ErrorCode.E0610); 268 } 269 try { 270 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 271 prevPending = coordJob.isPending(); 272 } 273 catch (JPAExecutorException je) { 274 throw new CommandException(je); 275 } 276 LogUtils.setLogInfo(coordJob, logInfo); 277 } 278 279 /* (non-Javadoc) 280 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 281 */ 282 @Override 283 protected void verifyPrecondition() throws CommandException, PreconditionException { 284 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus()); 285 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED 286 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 287 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 288 // Call the parent so the pending flag is reset and state transition 289 // of bundle can happen 290 if (coordJob.getBundleId() != null) { 291 bundleStatusUpdate.call(); 292 } 293 throw new CommandException(ErrorCode.E1018, 294 "coordinator job is killed or failed so all actions are not eligible to rerun!"); 295 } 296 297 // no actioins have been created for PREP job 298 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 299 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 300 // Call the parent so the pending flag is reset and state transition 301 // of bundle can happen 302 if (coordJob.getBundleId() != null) { 303 bundleStatusUpdate.call(); 304 } 305 throw new CommandException(ErrorCode.E1018, 306 "coordinator job is PREP so no actions are materialized to rerun!"); 307 } 308 } 309 310 @Override 311 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 312 verifyPrecondition(); 313 } 314 315 @Override 316 public void rerunChildren() throws CommandException { 317 boolean isError = false; 318 try { 319 CoordinatorActionInfo coordInfo = null; 320 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 321 List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope); 322 if (checkAllActionsRunnable(coordActions)) { 323 for (CoordinatorActionBean coordAction : coordActions) { 324 String actionXml = coordAction.getActionXml(); 325 if (!noCleanup) { 326 Element eAction = XmlUtils.parseXml(actionXml); 327 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup()); 328 } 329 if (refresh) { 330 refreshAction(coordJob, coordAction); 331 } 332 updateAction(coordJob, coordAction, actionXml); 333 334 queue(new CoordActionNotificationXCommand(coordAction), 100); 335 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); 336 } 337 } 338 else { 339 isError = true; 340 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 341 } 342 coordInfo = new CoordinatorActionInfo(coordActions); 343 344 ret = coordInfo; 345 } 346 catch (XException xex) { 347 isError = true; 348 throw new CommandException(xex); 349 } 350 catch (JDOMException jex) { 351 isError = true; 352 throw new CommandException(ErrorCode.E0700, jex); 353 } 354 catch (Exception ex) { 355 isError = true; 356 throw new CommandException(ErrorCode.E1018, ex); 357 } 358 finally{ 359 if(isError){ 360 transitToPrevious(); 361 } 362 } 363 } 364 365 /* 366 * (non-Javadoc) 367 * @see org.apache.oozie.command.TransitionXCommand#getJob() 368 */ 369 @Override 370 public Job getJob() { 371 return coordJob; 372 } 373 374 @Override 375 public void notifyParent() throws CommandException { 376 //update bundle action 377 if (getPrevStatus() != null && coordJob.getBundleId() != null) { 378 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus()); 379 bundleStatusUpdate.call(); 380 } 381 } 382 383 @Override 384 public void updateJob() { 385 if (getPrevStatus()!= null){ 386 Job.Status coordJobStatus = getPrevStatus(); 387 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 388 coordJob.setStatus(coordJobStatus); 389 } 390 if (prevPending) { 391 coordJob.setPending(); 392 } else { 393 coordJob.resetPending(); 394 } 395 } 396 397 updateList.add(coordJob); 398 } 399 400 /* (non-Javadoc) 401 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites() 402 */ 403 @Override 404 public void performWrites() throws CommandException { 405 try { 406 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 407 } 408 catch (JPAExecutorException e) { 409 throw new CommandException(e); 410 } 411 } 412 413 /* (non-Javadoc) 414 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 415 */ 416 @Override 417 public XLog getLog() { 418 return LOG; 419 } 420 421 @Override 422 public final void transitToNext() { 423 prevStatus = coordJob.getStatus(); 424 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED 425 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) { 426 coordJob.setStatus(Job.Status.RUNNING); 427 } 428 else { 429 // Check for backward compatibility for Oozie versions (3.2 and before) 430 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 431 // PAUSEDWITHERROR is not supported 432 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 433 } 434 // used for backward support of coordinator 0.1 schema 435 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus)); 436 coordJob.setPending(); 437 } 438 439 private final void transitToPrevious() throws CommandException { 440 coordJob.setStatus(getPrevStatus()); 441 if (!prevPending) { 442 coordJob.resetPending(); 443 } 444 else { 445 coordJob.setPending(); 446 } 447 } 448 }