001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Set; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.oozie.CoordinatorActionBean; 031 import org.apache.oozie.CoordinatorActionInfo; 032 import org.apache.oozie.CoordinatorJobBean; 033 import org.apache.oozie.ErrorCode; 034 import org.apache.oozie.XException; 035 import org.apache.oozie.action.ActionExecutorException; 036 import org.apache.oozie.action.hadoop.FsActionExecutor; 037 import org.apache.oozie.client.CoordinatorAction; 038 import org.apache.oozie.client.CoordinatorJob; 039 import org.apache.oozie.client.Job; 040 import org.apache.oozie.client.SLAEvent.SlaAppType; 041 import org.apache.oozie.client.rest.RestConstants; 042 import org.apache.oozie.command.CommandException; 043 import org.apache.oozie.command.PreconditionException; 044 import org.apache.oozie.command.RerunTransitionXCommand; 045 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 046 import org.apache.oozie.coord.CoordELFunctions; 047 import org.apache.oozie.coord.CoordUtils; 048 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 049 import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor; 050 import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor; 051 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 052 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 053 import org.apache.oozie.executor.jpa.JPAExecutorException; 054 import org.apache.oozie.service.JPAService; 055 import org.apache.oozie.service.Services; 056 import org.apache.oozie.util.DateUtils; 057 import org.apache.oozie.util.InstrumentUtils; 058 import org.apache.oozie.util.LogUtils; 059 import org.apache.oozie.util.ParamChecker; 060 import org.apache.oozie.util.StatusUtils; 061 import org.apache.oozie.util.XConfiguration; 062 import org.apache.oozie.util.XLog; 063 import org.apache.oozie.util.XmlUtils; 064 import org.apache.oozie.util.db.SLADbOperations; 065 import org.jdom.Element; 066 import org.jdom.JDOMException; 067 068 /** 069 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup. 070 * <p/> 071 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or 072 * {@link RestConstants.JOB_COORD_RERUN_ACTION}. 073 * <p/> 074 * The "refresh" is used to indicate if user wants to refresh an action's input and output events. 075 * <p/> 076 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 077 */ 078 public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> { 079 080 private String rerunType; 081 private String scope; 082 private boolean refresh; 083 private boolean noCleanup; 084 private CoordinatorJobBean coordJob = null; 085 private JPAService jpaService = null; 086 protected boolean prevPending; 087 088 /** 089 * The constructor for class {@link CoordRerunXCommand} 090 * 091 * @param jobId the job id 092 * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION} 093 * @param scope the rerun scope for given rerunType separated by "," 094 * @param refresh true if user wants to refresh input/output dataset urls 095 * @param noCleanup false if user wants to cleanup output events for given rerun actions 096 */ 097 public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 098 super("coord_rerun", "coord_rerun", 1); 099 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 100 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 101 this.scope = ParamChecker.notEmpty(scope, "scope"); 102 this.refresh = refresh; 103 this.noCleanup = noCleanup; 104 } 105 106 /** 107 * Check if all given actions are eligible to rerun. 108 * 109 * @param actions list of CoordinatorActionBean 110 * @return true if all actions are eligible to rerun 111 */ 112 private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 113 ParamChecker.notNull(coordActions, "Coord actions to be rerun"); 114 boolean ret = false; 115 for (CoordinatorActionBean coordAction : coordActions) { 116 ret = true; 117 if (!coordAction.isTerminalStatus()) { 118 ret = false; 119 break; 120 } 121 } 122 return ret; 123 } 124 125 /** 126 * Get the list of actions for a given coordinator job 127 * @param rerunType the rerun type (date, action) 128 * @param jobId the coordinator job id 129 * @param scope the date scope or action id scope 130 * @return the list of Coordinator actions 131 * @throws CommandException 132 */ 133 public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{ 134 List<CoordinatorActionBean> coordActions = null; 135 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) { 136 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope); 137 } 138 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) { 139 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope); 140 } 141 return coordActions; 142 } 143 144 /** 145 * Cleanup output-events directories 146 * 147 * @param eAction coordinator action xml 148 * @param user user name 149 * @param group group name 150 */ 151 @SuppressWarnings("unchecked") 152 private void cleanupOutputEvents(Element eAction, String user, String group) { 153 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 154 if (outputList != null) { 155 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 156 if (data.getChild("uris", data.getNamespace()) != null) { 157 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 158 if (uris != null) { 159 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 160 FsActionExecutor fsAe = new FsActionExecutor(); 161 for (String uri : uriArr) { 162 Path path = new Path(uri); 163 try { 164 fsAe.delete(user, group, path); 165 LOG.debug("Cleanup the output dir " + path); 166 } 167 catch (ActionExecutorException ae) { 168 LOG.warn("Failed to cleanup the output dir " + uri, ae); 169 } 170 } 171 } 172 173 } 174 } 175 } 176 else { 177 LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 178 } 179 } 180 181 /** 182 * Refresh an action's input and ouput events. 183 * 184 * @param coordJob coordinator job bean 185 * @param coordAction coordinator action bean 186 * @throws Exception thrown if failed to materialize coordinator action 187 */ 188 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception { 189 Configuration jobConf = null; 190 try { 191 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 192 } 193 catch (IOException ioe) { 194 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 195 throw new CommandException(ErrorCode.E1005, ioe); 196 } 197 String jobXml = coordJob.getJobXml(); 198 Element eJob = XmlUtils.parseXml(jobXml); 199 Date actualTime = new Date(); 200 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 201 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 202 LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 203 + XmlUtils.prettyPrint(actionXml).toString()); 204 coordAction.setActionXml(actionXml); 205 } 206 207 /** 208 * Update an action into database table 209 * 210 * @param coordJob coordinator job bean 211 * @param coordAction coordinator action bean 212 * @param actionXml coordinator action xml 213 * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event 214 */ 215 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml) 216 throws Exception { 217 LOG.debug("updateAction for actionId=" + coordAction.getId()); 218 if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) { 219 LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId()); 220 coordAction.setCreatedTime(new Date()); 221 } 222 coordAction.setStatus(CoordinatorAction.Status.WAITING); 223 coordAction.setExternalId(""); 224 coordAction.setExternalStatus(""); 225 coordAction.setRerunTime(new Date()); 226 coordAction.setLastModifiedTime(new Date()); 227 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction)); 228 writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup()); 229 } 230 231 /** 232 * Create SLA RegistrationEvent 233 * 234 * @param actionXml action xml 235 * @param actionBean coordinator action bean 236 * @param user user name 237 * @param group group name 238 * @throws Exception thrown if unable to write sla registration event 239 */ 240 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group) 241 throws Exception { 242 Element eAction = XmlUtils.parseXml(actionXml); 243 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 244 SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, group, 245 LOG); 246 } 247 248 /* (non-Javadoc) 249 * @see org.apache.oozie.command.XCommand#getEntityKey() 250 */ 251 @Override 252 public String getEntityKey() { 253 return jobId; 254 } 255 256 /* (non-Javadoc) 257 * @see org.apache.oozie.command.XCommand#isLockRequired() 258 */ 259 @Override 260 protected boolean isLockRequired() { 261 return true; 262 } 263 264 /* (non-Javadoc) 265 * @see org.apache.oozie.command.XCommand#loadState() 266 */ 267 @Override 268 protected void loadState() throws CommandException { 269 jpaService = Services.get().get(JPAService.class); 270 if (jpaService == null) { 271 throw new CommandException(ErrorCode.E0610); 272 } 273 try { 274 coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 275 prevPending = coordJob.isPending(); 276 } 277 catch (JPAExecutorException je) { 278 throw new CommandException(je); 279 } 280 LogUtils.setLogInfo(coordJob, logInfo); 281 } 282 283 /* (non-Javadoc) 284 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 285 */ 286 @Override 287 protected void verifyPrecondition() throws CommandException, PreconditionException { 288 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED 289 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 290 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 291 throw new CommandException(ErrorCode.E1018, 292 "coordinator job is killed or failed so all actions are not eligible to rerun!"); 293 } 294 295 // no actioins have been created for PREP job 296 if (coordJob.getStatus() == CoordinatorJob.Status.PREP) { 297 LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId); 298 throw new CommandException(ErrorCode.E1018, 299 "coordinator job is PREP so no actions are materialized to rerun!"); 300 } 301 } 302 303 @Override 304 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 305 verifyPrecondition(); 306 } 307 308 @Override 309 public void rerunChildren() throws CommandException { 310 boolean isError = false; 311 try { 312 CoordinatorActionInfo coordInfo = null; 313 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 314 List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope); 315 if (checkAllActionsRunnable(coordActions)) { 316 for (CoordinatorActionBean coordAction : coordActions) { 317 String actionXml = coordAction.getActionXml(); 318 if (!noCleanup) { 319 Element eAction = XmlUtils.parseXml(actionXml); 320 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup()); 321 } 322 if (refresh) { 323 refreshAction(coordJob, coordAction); 324 } 325 updateAction(coordJob, coordAction, actionXml); 326 327 queue(new CoordActionNotificationXCommand(coordAction), 100); 328 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100); 329 } 330 } 331 else { 332 isError = true; 333 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 334 } 335 coordInfo = new CoordinatorActionInfo(coordActions); 336 337 ret = coordInfo; 338 } 339 catch (XException xex) { 340 isError = true; 341 throw new CommandException(xex); 342 } 343 catch (JDOMException jex) { 344 isError = true; 345 throw new CommandException(ErrorCode.E0700, jex); 346 } 347 catch (Exception ex) { 348 isError = true; 349 throw new CommandException(ErrorCode.E1018, ex); 350 } 351 finally{ 352 if(isError){ 353 transitToPrevious(); 354 } 355 } 356 } 357 358 /* 359 * (non-Javadoc) 360 * @see org.apache.oozie.command.TransitionXCommand#getJob() 361 */ 362 @Override 363 public Job getJob() { 364 return coordJob; 365 } 366 367 @Override 368 public void notifyParent() throws CommandException { 369 //update bundle action 370 if (getPrevStatus() != null && coordJob.getBundleId() != null) { 371 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus()); 372 bundleStatusUpdate.call(); 373 } 374 } 375 376 @Override 377 public void updateJob() throws CommandException { 378 try { 379 // rerun a paused coordinator job will keep job status at paused and pending at previous pending 380 if (getPrevStatus()!= null && getPrevStatus().equals(Job.Status.PAUSED)) { 381 coordJob.setStatus(Job.Status.PAUSED); 382 if (prevPending) { 383 coordJob.setPending(); 384 } else { 385 coordJob.resetPending(); 386 } 387 } 388 389 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 390 } 391 catch (JPAExecutorException je) { 392 throw new CommandException(je); 393 } 394 } 395 396 /* (non-Javadoc) 397 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 398 */ 399 @Override 400 public XLog getLog() { 401 return LOG; 402 } 403 404 @Override 405 public final void transitToNext() { 406 prevStatus = coordJob.getStatus(); 407 coordJob.setStatus(Job.Status.RUNNING); 408 // used for backward support of coordinator 0.1 schema 409 coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus)); 410 coordJob.setPending(); 411 } 412 413 private final void transitToPrevious() throws CommandException { 414 coordJob.setStatus(getPrevStatus()); 415 if (!prevPending) { 416 coordJob.resetPending(); 417 } 418 else { 419 coordJob.setPending(); 420 } 421 } 422 }