001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.io.IOException; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Date; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.oozie.AppType; 037import org.apache.oozie.ErrorCode; 038import org.apache.oozie.WorkflowActionBean; 039import org.apache.oozie.WorkflowJobBean; 040import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; 041import org.apache.oozie.client.OozieClient; 042import org.apache.oozie.client.WorkflowAction; 043import org.apache.oozie.client.WorkflowJob; 044import org.apache.oozie.client.rest.JsonBean; 045import org.apache.oozie.command.CommandException; 046import org.apache.oozie.command.PreconditionException; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 049import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 050import org.apache.oozie.executor.jpa.BatchQueryExecutor; 051import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 052import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 053import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 054import org.apache.oozie.service.ConfigurationService; 055import org.apache.oozie.service.DagXLogInfoService; 056import org.apache.oozie.service.HadoopAccessorException; 057import org.apache.oozie.service.HadoopAccessorService; 058import org.apache.oozie.service.Services; 059import org.apache.oozie.service.UUIDService; 060import org.apache.oozie.service.WorkflowAppService; 061import org.apache.oozie.service.WorkflowStoreService; 062import org.apache.oozie.sla.SLAOperations; 063import org.apache.oozie.sla.service.SLAService; 064import org.apache.oozie.util.ConfigUtils; 065import org.apache.oozie.util.ELEvaluator; 066import org.apache.oozie.util.ELUtils; 067import org.apache.oozie.util.InstrumentUtils; 068import org.apache.oozie.util.LogUtils; 069import org.apache.oozie.util.ParamChecker; 070import org.apache.oozie.util.PropertiesUtils; 071import org.apache.oozie.util.XConfiguration; 072import org.apache.oozie.util.XLog; 073import org.apache.oozie.util.XmlUtils; 074import org.apache.oozie.workflow.WorkflowApp; 075import org.apache.oozie.workflow.WorkflowException; 076import org.apache.oozie.workflow.WorkflowInstance; 077import org.apache.oozie.workflow.WorkflowLib; 078import org.apache.oozie.workflow.lite.NodeHandler; 079import org.jdom.Element; 080import org.jdom.JDOMException; 081 082/** 083 * This is a RerunXCommand which is used for rerunn. 084 * 085 */ 086public class ReRunXCommand extends WorkflowXCommand<Void> { 087 private final String jobId; 088 private Configuration conf; 089 private final Set<String> nodesToSkip = new HashSet<String>(); 090 public static final String TO_SKIP = "TO_SKIP"; 091 private WorkflowJobBean wfBean; 092 private List<WorkflowActionBean> actions; 093 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 094 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 095 096 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 097 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 098 public static final String DISABLE_CHILD_RERUN = "oozie.wf.rerun.disablechild"; 099 100 static { 101 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 102 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 103 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 104 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 105 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 106 107 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 108 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 109 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 110 } 111 112 public ReRunXCommand(String jobId, Configuration conf) { 113 super("rerun", "rerun", 1); 114 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 115 this.conf = ParamChecker.notNull(conf, "conf"); 116 } 117 118 @Override 119 protected void setLogInfo() { 120 LogUtils.setLogInfo(jobId); 121 } 122 123 /* (non-Javadoc) 124 * @see org.apache.oozie.command.XCommand#execute() 125 */ 126 @Override 127 protected Void execute() throws CommandException { 128 setupReRun(); 129 startWorkflow(jobId); 130 return null; 131 } 132 133 private void startWorkflow(String jobId) throws CommandException { 134 new StartXCommand(jobId).call(); 135 } 136 137 private void setupReRun() throws CommandException { 138 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 139 LogUtils.setLogInfo(wfBean); 140 WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance(); 141 WorkflowInstance newWfInstance; 142 String appPath = null; 143 144 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 145 try { 146 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 147 WorkflowApp app = wps.parseDef(conf, null); 148 XConfiguration protoActionConf = wps.createProtoActionConf(conf, true); 149 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 150 151 appPath = conf.get(OozieClient.APP_PATH); 152 URI uri = new URI(appPath); 153 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 154 Configuration fsConf = has.createConfiguration(uri.getAuthority()); 155 FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf); 156 157 Path configDefault = null; 158 // app path could be a directory 159 Path path = new Path(uri.getPath()); 160 if (!fs.isFile(path)) { 161 configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT); 162 } 163 else { 164 configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT); 165 } 166 167 if (fs.exists(configDefault)) { 168 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 169 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 170 XConfiguration.injectDefaults(defaultConf, conf); 171 } 172 173 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 174 175 // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are 176 // preserved. The Configuration.get function within XConfiguration.resolve() works recursively to get the 177 // final value corresponding to a key in the map Resetting the conf to contain all the resolved values is 178 // necessary to ensure propagation of Oozie properties to Hadoop calls downstream 179 conf = ((XConfiguration) conf).resolve(); 180 181 try { 182 newWfInstance = workflowLib.createInstance(app, conf, jobId); 183 } 184 catch (WorkflowException e) { 185 throw new CommandException(e); 186 } 187 String appName = ELUtils.resolveAppName(app.getName(), conf); 188 if (SLAService.isEnabled()) { 189 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 190 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 191 Element eSla = XmlUtils.getSLAElement(wfElem); 192 String jobSlaXml = null; 193 if (eSla != null) { 194 jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla); 195 } 196 writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(), 197 conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName, 198 evalSla); 199 } 200 wfBean.setAppName(appName); 201 wfBean.setProtoActionConf(protoActionConf.toXmlString()); 202 } 203 catch (WorkflowException ex) { 204 throw new CommandException(ex); 205 } 206 catch (IOException ex) { 207 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 208 } 209 catch (HadoopAccessorException ex) { 210 throw new CommandException(ex); 211 } 212 catch (URISyntaxException ex) { 213 throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 214 } 215 catch (Exception ex) { 216 throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex); 217 } 218 219 for (int i = 0; i < actions.size(); i++) { 220 // Skipping to delete the sub workflow when rerun failed node option has been provided. As same 221 // action will be used to rerun the job. 222 if (!nodesToSkip.contains(actions.get(i).getName()) && 223 !(conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) && 224 SubWorkflowActionExecutor.ACTION_TYPE.equals(actions.get(i).getType()))) { 225 deleteList.add(actions.get(i)); 226 LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId()); 227 } 228 else { 229 copyActionData(newWfInstance, oldWfInstance); 230 } 231 } 232 233 wfBean.setAppPath(conf.get(OozieClient.APP_PATH)); 234 wfBean.setConf(XmlUtils.prettyPrint(conf).toString()); 235 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 236 wfBean.setUser(conf.get(OozieClient.USER_NAME)); 237 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 238 wfBean.setGroup(group); 239 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 240 wfBean.setEndTime(null); 241 wfBean.setRun(wfBean.getRun() + 1); 242 wfBean.setStatus(WorkflowJob.Status.PREP); 243 wfBean.setWorkflowInstance(newWfInstance); 244 245 try { 246 wfBean.setLastModifiedTime(new Date()); 247 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_RERUN, wfBean)); 248 // call JPAExecutor to do the bulk writes 249 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList); 250 } 251 catch (JPAExecutorException je) { 252 throw new CommandException(je); 253 } 254 finally { 255 updateParentIfNecessary(wfBean); 256 } 257 258 } 259 260 @SuppressWarnings("unchecked") 261 private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user, 262 String appName, ELEvaluator evalSla) throws JDOMException, CommandException { 263 if (jobSlaXml != null && jobSlaXml.length() > 0) { 264 Element eSla = XmlUtils.parseXml(jobSlaXml); 265 // insert into new table 266 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG, 267 true); 268 } 269 // Add sla for wf actions 270 for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) { 271 Element actionSla = XmlUtils.getSLAElement(action); 272 if (actionSla != null) { 273 String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla); 274 actionSla = XmlUtils.parseXml(actionSlaXml); 275 if (!nodesToSkip.contains(action.getAttributeValue("name"))) { 276 String actionId = Services.get().get(UUIDService.class) 277 .generateChildId(jobId, action.getAttributeValue("name") + ""); 278 SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user, 279 appName, LOG, true); 280 } 281 } 282 } 283 284 } 285 286 /** 287 * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the 288 * skipped node list 289 * 290 * @throws CommandException 291 */ 292 @Override 293 protected void eagerLoadState() throws CommandException { 294 try { 295 this.wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, this.jobId); 296 this.actions = WorkflowActionQueryExecutor.getInstance().getList( 297 WorkflowActionQuery.GET_ACTIONS_FOR_WORKFLOW_RERUN, this.jobId); 298 299 if (conf != null) { 300 if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes 301 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES); 302 for (String str : skipNodes) { 303 // trimming is required 304 nodesToSkip.add(str.trim()); 305 } 306 LOG.debug("Skipnode size :" + nodesToSkip.size()); 307 } 308 else { 309 for (WorkflowActionBean action : actions) { // Rerun from failed nodes 310 if (action.getStatus() == WorkflowAction.Status.OK) { 311 nodesToSkip.add(action.getName()); 312 } 313 } 314 LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size()); 315 } 316 StringBuilder tmp = new StringBuilder(); 317 for (String node : nodesToSkip) { 318 tmp.append(node).append(","); 319 } 320 LOG.debug("SkipNode List :" + tmp); 321 } 322 } 323 catch (Exception ex) { 324 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 325 } 326 } 327 328 /** 329 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed - 330 * The nodes that are to be skipped are to be completed successfully in the base run. 331 * 332 * @throws CommandException 333 * @throws PreconditionException On failure of pre-conditions 334 */ 335 @Override 336 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 337 // Throwing error if parent exist and same workflow trying to rerun, when running child workflow disabled 338 // through conf. 339 if (wfBean.getParentId() != null && !conf.getBoolean(SubWorkflowActionExecutor.SUBWORKFLOW_RERUN, false) 340 && ConfigurationService.getBoolean(DISABLE_CHILD_RERUN)) { 341 throw new CommandException(ErrorCode.E0755, " Rerun is not allowed through child workflow, please" + 342 " re-run through the parent " + wfBean.getParentId()); 343 } 344 345 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED) 346 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals( 347 WorkflowJob.Status.SUCCEEDED))) { 348 throw new CommandException(ErrorCode.E0805, wfBean.getStatus()); 349 } 350 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip); 351 for (WorkflowActionBean action : actions) { 352 if (nodesToSkip.contains(action.getName())) { 353 if (!action.getStatus().equals(WorkflowAction.Status.OK) 354 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) { 355 throw new CommandException(ErrorCode.E0806, action.getName()); 356 } 357 unmachedNodes.remove(action.getName()); 358 } 359 } 360 if (unmachedNodes.size() > 0) { 361 StringBuilder sb = new StringBuilder(); 362 String separator = ""; 363 for (String s : unmachedNodes) { 364 sb.append(separator).append(s); 365 separator = ","; 366 } 367 throw new CommandException(ErrorCode.E0807, sb); 368 } 369 } 370 371 /** 372 * Copys the variables for skipped nodes from the old wfInstance to new one. 373 * 374 * @param newWfInstance : Source WF instance object 375 * @param oldWfInstance : Update WF instance 376 */ 377 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) { 378 Map<String, String> oldVars = new HashMap<String, String>(); 379 Map<String, String> newVars = new HashMap<String, String>(); 380 oldVars = oldWfInstance.getAllVars(); 381 for (String var : oldVars.keySet()) { 382 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0]; 383 if (nodesToSkip.contains(actionName)) { 384 newVars.put(var, oldVars.get(var)); 385 } 386 } 387 for (String node : nodesToSkip) { 388 // Setting the TO_SKIP variable to true. This will be used by 389 // SignalCommand and LiteNodeHandler to skip the action. 390 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true"); 391 String visitedFlag = NodeHandler.getLoopFlag(node); 392 // Removing the visited flag so that the action won't be considered 393 // a loop. 394 if (newVars.containsKey(visitedFlag)) { 395 newVars.remove(visitedFlag); 396 } 397 } 398 newWfInstance.setAllVars(newVars); 399 } 400 401 /* (non-Javadoc) 402 * @see org.apache.oozie.command.XCommand#getEntityKey() 403 */ 404 @Override 405 public String getEntityKey() { 406 return this.jobId; 407 } 408 409 /* (non-Javadoc) 410 * @see org.apache.oozie.command.XCommand#isLockRequired() 411 */ 412 @Override 413 protected boolean isLockRequired() { 414 return true; 415 } 416 417 /* (non-Javadoc) 418 * @see org.apache.oozie.command.XCommand#loadState() 419 */ 420 @Override 421 protected void loadState() throws CommandException { 422 try { 423 this.wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_RERUN, this.jobId); 424 this.actions = WorkflowActionQueryExecutor.getInstance().getList( 425 WorkflowActionQuery.GET_ACTIONS_FOR_WORKFLOW_RERUN, this.jobId); 426 } 427 catch (JPAExecutorException jpe) { 428 throw new CommandException(jpe); 429 } 430 } 431 432 /* (non-Javadoc) 433 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 434 */ 435 @Override 436 protected void verifyPrecondition() throws CommandException, PreconditionException { 437 eagerVerifyPrecondition(); 438 } 439}