001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.Date; 023 import java.util.List; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.Path; 026 import org.apache.oozie.CoordinatorActionBean; 027 import org.apache.oozie.CoordinatorActionInfo; 028 import org.apache.oozie.CoordinatorJobBean; 029 import org.apache.oozie.ErrorCode; 030 import org.apache.oozie.SLAEventBean; 031 import org.apache.oozie.XException; 032 import org.apache.oozie.action.ActionExecutorException; 033 import org.apache.oozie.action.hadoop.FsActionExecutor; 034 import org.apache.oozie.client.CoordinatorAction; 035 import org.apache.oozie.client.CoordinatorJob; 036 import org.apache.oozie.client.Job; 037 import org.apache.oozie.client.SLAEvent.SlaAppType; 038 import org.apache.oozie.client.rest.RestConstants; 039 import org.apache.oozie.command.CommandException; 040 import org.apache.oozie.command.PreconditionException; 041 import org.apache.oozie.command.RerunTransitionXCommand; 042 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 043 import org.apache.oozie.coord.CoordELFunctions; 044 import org.apache.oozie.coord.CoordUtils; 045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 046 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.JPAExecutorException; 048 import org.apache.oozie.service.JPAService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.util.InstrumentUtils; 051 import org.apache.oozie.util.LogUtils; 052 import org.apache.oozie.util.ParamChecker; 053 import org.apache.oozie.util.StatusUtils; 054 import org.apache.oozie.util.XConfiguration; 055 import org.apache.oozie.util.XLog; 056 import org.apache.oozie.util.XmlUtils; 057 import org.apache.oozie.util.db.SLADbOperations; 058 import org.jdom.Element; 059 import org.jdom.JDOMException; 060 061 /** 062 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup. 063 * <p/> 064 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or 065 * {@link RestConstants.JOB_COORD_RERUN_ACTION}. 066 * <p/> 067 * The "refresh" is used to indicate if user wants to refresh an action's input and output events. 068 * <p/> 069 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 070 */ 071 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> { 072 073 private String rerunType; 074 private String scope; 075 private boolean refresh; 076 private boolean noCleanup; 077 private CoordinatorJobBean coordJob = null; 078 private JPAService jpaService = null; 079 protected boolean prevPending; 080 081 /** 082 * The constructor for class {@link CoordRerunXCommand} 083 * 084 * @param jobId the job id 085 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION} 086 * @param scope the rerun scope for given rerunType separated by "," 087 * @param refresh true if user wants to refresh input/output dataset urls 088 * @param noCleanup false if user wants to cleanup output events for given rerun actions 089 */ 090 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 091 super("coord_rerun", "coord_rerun", 1); 092 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 093 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 094 this.scope = ParamChecker.notEmpty(scope, "scope"); 095 this.refresh = refresh; 096 this.noCleanup = noCleanup; 097 } 098 099 /** 100 * Check if all given actions are eligible to rerun. 101 * 102 * @param actions list of CoordinatorActionBean 103 * @return true if all actions are eligible to rerun 104 */ 105 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 106 ParamChecker.notNull(coordActions, "Coord actions to be rerun"); 107 boolean ret = false; 108 for (CoordinatorActionBean coordAction : coordActions) { 109 ret = true; 110 if (!coordAction.isTerminalStatus()) { 111 ret = false; 112 break; 113 } 114 } 115 return ret; 116 } 117 118 /** 119 * Get the list of actions for a given coordinator job 120 * @param rerunType the rerun type (date, action) 121 * @param jobId the coordinator job id 122 * @param scope the date scope or action id scope 123 * @return the list of Coordinator actions 124 * @throws CommandException 125 */ 126 public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{ 127 List<CoordinatorActionBean> coordActions = null; 128 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) { 129 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope); 130 } 131 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) { 132 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope); 133 } 134 return coordActions; 135 } 136 137 /** 138 * Cleanup output-events directories 139 * 140 * @param eAction coordinator action xml 141 * @param user user name 142 * @param group group name 143 */ 144 @SuppressWarnings("unchecked") 145 private void cleanupOutputEvents(Element eAction, String user, String group) { 146 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 147 if (outputList != null) { 148 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 149 if (data.getChild("uris", data.getNamespace()) != null) { 150 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 151 if (uris != null) { 152 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 153 FsActionExecutor fsAe = new FsActionExecutor(); 154 for (String uri : uriArr) { 155 Path path = new Path(uri); 156 try { 157 fsAe.delete(user, group, path); 158 LOG.debug("Cleanup the output dir " + path); 159 } 160 catch (ActionExecutorException ae) { 161 LOG.warn("Failed to cleanup the output dir " + uri, ae); 162 } 163 } 164 } 165 166 } 167 } 168 } 169 else { 170 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 171 } 172 } 173 174 /** 175 * Refresh an action's input and ouput events. 176 * 177 * @param coordJob coordinator job bean 178 * @param coordAction coordinator action bean 179 * @throws Exception thrown if failed to materialize coordinator action 180 */ 181 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception { 182 Configuration jobConf = null; 183 try { 184 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 185 } 186 catch (IOException ioe) { 187 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 188 throw new CommandException(ErrorCode.E1005, ioe); 189 } 190 String jobXml = coordJob.getJobXml(); 191 Element eJob = XmlUtils.parseXml(jobXml); 192 Date actualTime = new Date(); 193 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 194 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 195 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 196 + XmlUtils.prettyPrint(actionXml).toString()); 197 coordAction.setActionXml(actionXml); 198 } 199 200 /** 201 * Update an action into database table 202 * 203 * @param coordJob coordinator job bean 204 * @param coordAction coordinator action bean 205 * @param actionXml coordinator action xml 206 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event 207 */ 208 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml) 209 throws Exception { 210 LOG.debug("updateAction for actionId=" + coordAction.getId()); 211 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) { 212 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId()); 213 coordAction.setCreatedTime(new Date()); 214 } 215 coordAction.setStatus(CoordinatorAction.Status.WAITING); 216 coordAction.setExternalId(""); 217 coordAction.setExternalStatus(""); 218 coordAction.setRerunTime(new Date()); 219 coordAction.setLastModifiedTime(new Date()); 220 updateList.add(coordAction); 221 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup()); 222 } 223 224 /** 225 * Create SLA RegistrationEvent 226 * 227 * @param actionXml action xml 228 * @param actionBean coordinator action bean 229 * @param user user name 230 * @param group group name 231 * @throws Exception thrown if unable to write sla registration event 232 */ 233 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group) 234 throws Exception { 235 Element eAction = XmlUtils.parseXml(actionXml); 236 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 237 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), 238 SlaAppType.COORDINATOR_ACTION, user, group, LOG); 239 if(slaEvent != null) { 240 insertList.add(slaEvent); 241 } 242 } 243 244 /* (non-Javadoc) 245 * @see org.apache.oozie.command.XCommand#getEntityKey() 246 */ 247 @Override 248 public String getEntityKey() { 249 return jobId; 250 } 251 252 /* (non-Javadoc) 253 * @see org.apache.oozie.command.XCommand#isLockRequired() 254 */ 255 @Override 256 protected boolean isLockRequired() { 257 return true; 258 } 259 260 /* (non-Javadoc) 261 * @see org.apache.oozie.command.XCommand#loadState() 262 */ 263 @Override 264 protected void loadState() throws CommandException { 265 jpaService = Services.get().get(JPAService.class); 266 if (jpaService == null) { 267 throw new CommandException(ErrorCode.E0610); 268 } 269 try { 270 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 271 prevPending = coordJob.isPending(); 272 } 273 catch (JPAExecutorException je) { 274 throw new CommandException(je); 275 } 276 LogUtils.setLogInfo(coordJob, logInfo); 277 } 278 279 /* (non-Javadoc) 280 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 281 */ 282 @Override 283 protected void verifyPrecondition() throws CommandException, PreconditionException { 284 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED 285 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 286 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 287 throw new CommandException(ErrorCode.E1018, 288 "coordinator job is killed or failed so all actions are not eligible to rerun!"); 289 } 290 291 // no actioins have been created for PREP job 292 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 293 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 294 throw new CommandException(ErrorCode.E1018, 295 "coordinator job is PREP so no actions are materialized to rerun!"); 296 } 297 } 298 299 @Override 300 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 301 verifyPrecondition(); 302 } 303 304 @Override 305 public void rerunChildren() throws CommandException { 306 boolean isError = false; 307 try { 308 CoordinatorActionInfo coordInfo = null; 309 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 310 List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope); 311 if (checkAllActionsRunnable(coordActions)) { 312 for (CoordinatorActionBean coordAction : coordActions) { 313 String actionXml = coordAction.getActionXml(); 314 if (!noCleanup) { 315 Element eAction = XmlUtils.parseXml(actionXml); 316 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup()); 317 } 318 if (refresh) { 319 refreshAction(coordJob, coordAction); 320 } 321 updateAction(coordJob, coordAction, actionXml); 322 323 queue(new CoordActionNotificationXCommand(coordAction), 100); 324 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); 325 } 326 } 327 else { 328 isError = true; 329 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 330 } 331 coordInfo = new CoordinatorActionInfo(coordActions); 332 333 ret = coordInfo; 334 } 335 catch (XException xex) { 336 isError = true; 337 throw new CommandException(xex); 338 } 339 catch (JDOMException jex) { 340 isError = true; 341 throw new CommandException(ErrorCode.E0700, jex); 342 } 343 catch (Exception ex) { 344 isError = true; 345 throw new CommandException(ErrorCode.E1018, ex); 346 } 347 finally{ 348 if(isError){ 349 transitToPrevious(); 350 } 351 } 352 } 353 354 /* 355 * (non-Javadoc) 356 * @see org.apache.oozie.command.TransitionXCommand#getJob() 357 */ 358 @Override 359 public Job getJob() { 360 return coordJob; 361 } 362 363 @Override 364 public void notifyParent() throws CommandException { 365 //update bundle action 366 if (getPrevStatus() != null && coordJob.getBundleId() != null) { 367 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus()); 368 bundleStatusUpdate.call(); 369 } 370 } 371 372 @Override 373 public void updateJob() { 374 if (getPrevStatus()!= null){ 375 Job.Status coordJobStatus = getPrevStatus(); 376 if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 377 coordJob.setStatus(coordJobStatus); 378 } 379 if (prevPending) { 380 coordJob.setPending(); 381 } else { 382 coordJob.resetPending(); 383 } 384 } 385 386 updateList.add(coordJob); 387 } 388 389 /* (non-Javadoc) 390 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites() 391 */ 392 @Override 393 public void performWrites() throws CommandException { 394 try { 395 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 396 } 397 catch (JPAExecutorException e) { 398 throw new CommandException(e); 399 } 400 } 401 402 /* (non-Javadoc) 403 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 404 */ 405 @Override 406 public XLog getLog() { 407 return LOG; 408 } 409 410 @Override 411 public final void transitToNext() { 412 prevStatus = coordJob.getStatus(); 413 if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED 414 || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) { 415 coordJob.setStatus(Job.Status.RUNNING); 416 } 417 else { 418 // Check for backward compatibility for Oozie versions (3.2 and before) 419 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 420 // PAUSEDWITHERROR is not supported 421 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 422 } 423 // used for backward support of coordinator 0.1 schema 424 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus)); 425 coordJob.setPending(); 426 } 427 428 private final void transitToPrevious() throws CommandException { 429 coordJob.setStatus(getPrevStatus()); 430 if (!prevPending) { 431 coordJob.resetPending(); 432 } 433 else { 434 coordJob.setPending(); 435 } 436 } 437 }