001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 import java.util.ArrayList; 024 import java.util.Collection; 025 import java.util.Date; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.oozie.AppType; 036 import org.apache.oozie.ErrorCode; 037 import org.apache.oozie.WorkflowActionBean; 038 import org.apache.oozie.WorkflowJobBean; 039 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; 040 import org.apache.oozie.client.OozieClient; 041 import org.apache.oozie.client.WorkflowAction; 042 import org.apache.oozie.client.WorkflowJob; 043 import org.apache.oozie.client.rest.JsonBean; 044 import org.apache.oozie.command.CommandException; 045 import org.apache.oozie.command.PreconditionException; 046 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor; 047 import org.apache.oozie.executor.jpa.JPAExecutorException; 048 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; 049 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 050 import org.apache.oozie.service.DagXLogInfoService; 051 import org.apache.oozie.service.HadoopAccessorException; 052 import org.apache.oozie.service.HadoopAccessorService; 053 import org.apache.oozie.service.JPAService; 054 import org.apache.oozie.service.Services; 055 import org.apache.oozie.service.UUIDService; 056 import org.apache.oozie.service.WorkflowAppService; 057 import org.apache.oozie.service.WorkflowStoreService; 058 import org.apache.oozie.sla.SLAOperations; 059 import org.apache.oozie.sla.service.SLAService; 060 import org.apache.oozie.util.ConfigUtils; 061 import org.apache.oozie.util.ELEvaluator; 062 import org.apache.oozie.util.ELUtils; 063 import org.apache.oozie.util.InstrumentUtils; 064 import org.apache.oozie.util.LogUtils; 065 import org.apache.oozie.util.ParamChecker; 066 import org.apache.oozie.util.PropertiesUtils; 067 import org.apache.oozie.util.XConfiguration; 068 import org.apache.oozie.util.XLog; 069 import org.apache.oozie.util.XmlUtils; 070 import org.apache.oozie.workflow.WorkflowApp; 071 import org.apache.oozie.workflow.WorkflowException; 072 import org.apache.oozie.workflow.WorkflowInstance; 073 import org.apache.oozie.workflow.WorkflowLib; 074 import org.apache.oozie.workflow.lite.NodeHandler; 075 import org.jdom.Element; 076 import org.jdom.JDOMException; 077 078 /** 079 * This is a RerunXCommand which is used for rerunn. 080 * 081 */ 082 public class ReRunXCommand extends WorkflowXCommand<Void> { 083 private final String jobId; 084 private Configuration conf; 085 private final Set<String> nodesToSkip = new HashSet<String>(); 086 public static final String TO_SKIP = "TO_SKIP"; 087 private WorkflowJobBean wfBean; 088 private List<WorkflowActionBean> actions; 089 private JPAService jpaService; 090 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 091 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 092 093 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 094 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 095 096 static { 097 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 098 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 099 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 100 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 101 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 102 103 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 104 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 105 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 106 } 107 108 public ReRunXCommand(String jobId, Configuration conf) { 109 super("rerun", "rerun", 1); 110 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 111 this.conf = ParamChecker.notNull(conf, "conf"); 112 } 113 114 /* (non-Javadoc) 115 * @see org.apache.oozie.command.XCommand#execute() 116 */ 117 @Override 118 protected Void execute() throws CommandException { 119 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 120 LogUtils.setLogInfo(wfBean, logInfo); 121 WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance(); 122 WorkflowInstance newWfInstance; 123 String appPath = null; 124 125 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 126 try { 127 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 128 WorkflowApp app = wps.parseDef(conf); 129 XConfiguration protoActionConf = wps.createProtoActionConf(conf, true); 130 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 131 132 appPath = conf.get(OozieClient.APP_PATH); 133 URI uri = new URI(appPath); 134 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 135 Configuration fsConf = has.createJobConf(uri.getAuthority()); 136 FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf); 137 138 Path configDefault = null; 139 // app path could be a directory 140 Path path = new Path(uri.getPath()); 141 if (!fs.isFile(path)) { 142 configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT); 143 } else { 144 configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT); 145 } 146 147 if (fs.exists(configDefault)) { 148 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 149 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 150 XConfiguration.injectDefaults(defaultConf, conf); 151 } 152 153 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 154 155 // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved. 156 // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map 157 // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream 158 conf = ((XConfiguration) conf).resolve(); 159 160 try { 161 newWfInstance = workflowLib.createInstance(app, conf, jobId); 162 } 163 catch (WorkflowException e) { 164 throw new CommandException(e); 165 } 166 167 if (SLAService.isEnabled()) { 168 Element wfElem = XmlUtils.parseXml(app.getDefinition()); 169 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit"); 170 Element eSla = XmlUtils.getSLAElement(wfElem); 171 String jobSlaXml = null; 172 if (eSla != null) { 173 jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla); 174 } 175 String appName = ELUtils.resolveAppName(app.getName(), conf); 176 writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(), 177 conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName, 178 evalSla); 179 } 180 wfBean.setAppName(app.getName()); 181 wfBean.setProtoActionConf(protoActionConf.toXmlString()); 182 } 183 catch (WorkflowException ex) { 184 throw new CommandException(ex); 185 } 186 catch (IOException ex) { 187 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 188 } 189 catch (HadoopAccessorException ex) { 190 throw new CommandException(ex); 191 } 192 catch (URISyntaxException ex) { 193 throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 194 } 195 catch (Exception ex) { 196 throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex); 197 } 198 199 for (int i = 0; i < actions.size(); i++) { 200 if (!nodesToSkip.contains(actions.get(i).getName())) { 201 deleteList.add(actions.get(i)); 202 LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId()); 203 } 204 else { 205 copyActionData(newWfInstance, oldWfInstance); 206 } 207 } 208 209 wfBean.setAppPath(conf.get(OozieClient.APP_PATH)); 210 wfBean.setConf(XmlUtils.prettyPrint(conf).toString()); 211 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 212 wfBean.setUser(conf.get(OozieClient.USER_NAME)); 213 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 214 wfBean.setGroup(group); 215 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 216 wfBean.setEndTime(null); 217 wfBean.setRun(wfBean.getRun() + 1); 218 wfBean.setStatus(WorkflowJob.Status.PREP); 219 wfBean.setWorkflowInstance(newWfInstance); 220 221 try { 222 wfBean.setLastModifiedTime(new Date()); 223 updateList.add(wfBean); 224 // call JPAExecutor to do the bulk writes 225 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true)); 226 } 227 catch (JPAExecutorException je) { 228 throw new CommandException(je); 229 } 230 231 return null; 232 } 233 234 235 @SuppressWarnings("unchecked") 236 private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user, 237 String appName, ELEvaluator evalSla) throws JDOMException, CommandException { 238 if (jobSlaXml != null && jobSlaXml.length() > 0) { 239 Element eSla = XmlUtils.parseXml(jobSlaXml); 240 // insert into new table 241 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG, 242 true); 243 } 244 // Add sla for wf actions 245 for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) { 246 Element actionSla = XmlUtils.getSLAElement(action); 247 if (actionSla != null) { 248 String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla); 249 actionSla = XmlUtils.parseXml(actionSlaXml); 250 if (!nodesToSkip.contains(action.getAttributeValue("name"))) { 251 String actionId = Services.get().get(UUIDService.class) 252 .generateChildId(jobId, action.getAttributeValue("name") + ""); 253 SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user, 254 appName, LOG, true); 255 } 256 } 257 } 258 259 } 260 261 /** 262 * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the 263 * skipped node list 264 * 265 * @throws CommandException 266 */ 267 @Override 268 protected void eagerLoadState() throws CommandException { 269 super.eagerLoadState(); 270 try { 271 jpaService = Services.get().get(JPAService.class); 272 if (jpaService != null) { 273 this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId)); 274 this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId)); 275 } 276 else { 277 throw new CommandException(ErrorCode.E0610); 278 } 279 280 if (conf != null) { 281 if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes 282 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES); 283 for (String str : skipNodes) { 284 // trimming is required 285 nodesToSkip.add(str.trim()); 286 } 287 LOG.debug("Skipnode size :" + nodesToSkip.size()); 288 } 289 else { 290 for (WorkflowActionBean action : actions) { // Rerun from failed nodes 291 if (action.getStatus() == WorkflowAction.Status.OK) { 292 nodesToSkip.add(action.getName()); 293 } 294 } 295 LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size()); 296 } 297 StringBuilder tmp = new StringBuilder(); 298 for (String node : nodesToSkip) { 299 tmp.append(node).append(","); 300 } 301 LOG.debug("SkipNode List :" + tmp); 302 } 303 } 304 catch (Exception ex) { 305 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 306 } 307 } 308 309 /** 310 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed - 311 * The nodes that are to be skipped are to be completed successfully in the base run. 312 * 313 * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions 314 */ 315 @Override 316 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 317 super.eagerVerifyPrecondition(); 318 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED) 319 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals( 320 WorkflowJob.Status.SUCCEEDED))) { 321 throw new CommandException(ErrorCode.E0805, wfBean.getStatus()); 322 } 323 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip); 324 for (WorkflowActionBean action : actions) { 325 if (nodesToSkip.contains(action.getName())) { 326 if (!action.getStatus().equals(WorkflowAction.Status.OK) 327 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) { 328 throw new CommandException(ErrorCode.E0806, action.getName()); 329 } 330 unmachedNodes.remove(action.getName()); 331 } 332 } 333 if (unmachedNodes.size() > 0) { 334 StringBuilder sb = new StringBuilder(); 335 String separator = ""; 336 for (String s : unmachedNodes) { 337 sb.append(separator).append(s); 338 separator = ","; 339 } 340 throw new CommandException(ErrorCode.E0807, sb); 341 } 342 } 343 344 /** 345 * Copys the variables for skipped nodes from the old wfInstance to new one. 346 * 347 * @param newWfInstance : Source WF instance object 348 * @param oldWfInstance : Update WF instance 349 */ 350 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) { 351 Map<String, String> oldVars = new HashMap<String, String>(); 352 Map<String, String> newVars = new HashMap<String, String>(); 353 oldVars = oldWfInstance.getAllVars(); 354 for (String var : oldVars.keySet()) { 355 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0]; 356 if (nodesToSkip.contains(actionName)) { 357 newVars.put(var, oldVars.get(var)); 358 } 359 } 360 for (String node : nodesToSkip) { 361 // Setting the TO_SKIP variable to true. This will be used by 362 // SignalCommand and LiteNodeHandler to skip the action. 363 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true"); 364 String visitedFlag = NodeHandler.getLoopFlag(node); 365 // Removing the visited flag so that the action won't be considered 366 // a loop. 367 if (newVars.containsKey(visitedFlag)) { 368 newVars.remove(visitedFlag); 369 } 370 } 371 newWfInstance.setAllVars(newVars); 372 } 373 374 /* (non-Javadoc) 375 * @see org.apache.oozie.command.XCommand#getEntityKey() 376 */ 377 @Override 378 public String getEntityKey() { 379 return this.jobId; 380 } 381 382 /* (non-Javadoc) 383 * @see org.apache.oozie.command.XCommand#isLockRequired() 384 */ 385 @Override 386 protected boolean isLockRequired() { 387 return true; 388 } 389 390 /* (non-Javadoc) 391 * @see org.apache.oozie.command.XCommand#loadState() 392 */ 393 @Override 394 protected void loadState() throws CommandException { 395 eagerLoadState(); 396 } 397 398 /* (non-Javadoc) 399 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 400 */ 401 @Override 402 protected void verifyPrecondition() throws CommandException, PreconditionException { 403 eagerVerifyPrecondition(); 404 } 405 }