001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 import java.util.ArrayList; 024 import java.util.Collection; 025 import java.util.Date; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.oozie.ErrorCode; 036 import org.apache.oozie.WorkflowActionBean; 037 import org.apache.oozie.WorkflowJobBean; 038 import org.apache.oozie.client.OozieClient; 039 import org.apache.oozie.client.WorkflowAction; 040 import org.apache.oozie.client.WorkflowJob; 041 import org.apache.oozie.client.rest.JsonBean; 042 import org.apache.oozie.command.CommandException; 043 import org.apache.oozie.command.PreconditionException; 044 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor; 045 import org.apache.oozie.executor.jpa.JPAExecutorException; 046 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; 047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 048 import org.apache.oozie.service.DagXLogInfoService; 049 import org.apache.oozie.service.HadoopAccessorException; 050 import org.apache.oozie.service.HadoopAccessorService; 051 import org.apache.oozie.service.JPAService; 052 import org.apache.oozie.service.Services; 053 import org.apache.oozie.service.WorkflowAppService; 054 import org.apache.oozie.service.WorkflowStoreService; 055 import org.apache.oozie.util.ConfigUtils; 056 import org.apache.oozie.util.InstrumentUtils; 057 import org.apache.oozie.util.LogUtils; 058 import org.apache.oozie.util.ParamChecker; 059 import org.apache.oozie.util.PropertiesUtils; 060 import org.apache.oozie.util.XConfiguration; 061 import org.apache.oozie.util.XLog; 062 import org.apache.oozie.util.XmlUtils; 063 import org.apache.oozie.workflow.WorkflowApp; 064 import org.apache.oozie.workflow.WorkflowException; 065 import org.apache.oozie.workflow.WorkflowInstance; 066 import org.apache.oozie.workflow.WorkflowLib; 067 import org.apache.oozie.workflow.lite.NodeHandler; 068 069 /** 070 * This is a RerunXCommand which is used for rerunn. 071 * 072 */ 073 public class ReRunXCommand extends WorkflowXCommand<Void> { 074 private final String jobId; 075 private Configuration conf; 076 private final String authToken; 077 private final Set<String> nodesToSkip = new HashSet<String>(); 078 public static final String TO_SKIP = "TO_SKIP"; 079 private WorkflowJobBean wfBean; 080 private List<WorkflowActionBean> actions; 081 private JPAService jpaService; 082 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 083 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 084 085 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 086 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 087 088 static { 089 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 090 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 091 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 092 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 094 095 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 098 } 099 100 public ReRunXCommand(String jobId, Configuration conf, String authToken) { 101 super("rerun", "rerun", 1); 102 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 103 this.conf = ParamChecker.notNull(conf, "conf"); 104 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 105 } 106 107 /* (non-Javadoc) 108 * @see org.apache.oozie.command.XCommand#execute() 109 */ 110 @Override 111 protected Void execute() throws CommandException { 112 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 113 LogUtils.setLogInfo(wfBean, logInfo); 114 WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance(); 115 WorkflowInstance newWfInstance; 116 String appPath = null; 117 118 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 119 try { 120 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 121 WorkflowApp app = wps.parseDef(conf, authToken); 122 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true); 123 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 124 125 appPath = conf.get(OozieClient.APP_PATH); 126 URI uri = new URI(appPath); 127 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 128 Configuration fsConf = has.createJobConf(uri.getAuthority()); 129 FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf); 130 131 Path configDefault = null; 132 // app path could be a directory 133 Path path = new Path(uri.getPath()); 134 if (!fs.isFile(path)) { 135 configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT); 136 } else { 137 configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT); 138 } 139 140 if (fs.exists(configDefault)) { 141 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 142 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 143 XConfiguration.injectDefaults(defaultConf, conf); 144 } 145 146 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 147 148 // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved. 149 // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map 150 // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream 151 conf = ((XConfiguration) conf).resolve(); 152 153 try { 154 newWfInstance = workflowLib.createInstance(app, conf, jobId); 155 } 156 catch (WorkflowException e) { 157 throw new CommandException(e); 158 } 159 wfBean.setAppName(app.getName()); 160 wfBean.setProtoActionConf(protoActionConf.toXmlString()); 161 } 162 catch (WorkflowException ex) { 163 throw new CommandException(ex); 164 } 165 catch (IOException ex) { 166 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 167 } 168 catch (HadoopAccessorException ex) { 169 throw new CommandException(ex); 170 } 171 catch (URISyntaxException ex) { 172 throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex); 173 } 174 175 for (int i = 0; i < actions.size(); i++) { 176 if (!nodesToSkip.contains(actions.get(i).getName())) { 177 deleteList.add(actions.get(i)); 178 LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId()); 179 } 180 else { 181 copyActionData(newWfInstance, oldWfInstance); 182 } 183 } 184 185 wfBean.setAppPath(conf.get(OozieClient.APP_PATH)); 186 wfBean.setConf(XmlUtils.prettyPrint(conf).toString()); 187 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 188 wfBean.setUser(conf.get(OozieClient.USER_NAME)); 189 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 190 wfBean.setGroup(group); 191 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 192 wfBean.setEndTime(null); 193 wfBean.setRun(wfBean.getRun() + 1); 194 wfBean.setStatus(WorkflowJob.Status.PREP); 195 wfBean.setWorkflowInstance(newWfInstance); 196 197 try { 198 wfBean.setLastModifiedTime(new Date()); 199 updateList.add(wfBean); 200 // call JPAExecutor to do the bulk writes 201 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true)); 202 } 203 catch (JPAExecutorException je) { 204 throw new CommandException(je); 205 } 206 207 return null; 208 } 209 210 /** 211 * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the 212 * skipped node list 213 * 214 * @throws CommandException 215 */ 216 @Override 217 protected void eagerLoadState() throws CommandException { 218 super.eagerLoadState(); 219 try { 220 jpaService = Services.get().get(JPAService.class); 221 if (jpaService != null) { 222 this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId)); 223 this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId)); 224 } 225 else { 226 throw new CommandException(ErrorCode.E0610); 227 } 228 229 if (conf != null) { 230 if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes 231 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES); 232 for (String str : skipNodes) { 233 // trimming is required 234 nodesToSkip.add(str.trim()); 235 } 236 LOG.debug("Skipnode size :" + nodesToSkip.size()); 237 } 238 else { 239 for (WorkflowActionBean action : actions) { // Rerun from failed nodes 240 if (action.getStatus() == WorkflowAction.Status.OK) { 241 nodesToSkip.add(action.getName()); 242 } 243 } 244 LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size()); 245 } 246 StringBuilder tmp = new StringBuilder(); 247 for (String node : nodesToSkip) { 248 tmp.append(node).append(","); 249 } 250 LOG.debug("SkipNode List :" + tmp); 251 } 252 } 253 catch (Exception ex) { 254 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 255 } 256 } 257 258 /** 259 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed - 260 * The nodes that are to be skipped are to be completed successfully in the base run. 261 * 262 * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions 263 */ 264 @Override 265 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 266 super.eagerVerifyPrecondition(); 267 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED) 268 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals( 269 WorkflowJob.Status.SUCCEEDED))) { 270 throw new CommandException(ErrorCode.E0805, wfBean.getStatus()); 271 } 272 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip); 273 for (WorkflowActionBean action : actions) { 274 if (nodesToSkip.contains(action.getName())) { 275 if (!action.getStatus().equals(WorkflowAction.Status.OK) 276 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) { 277 throw new CommandException(ErrorCode.E0806, action.getName()); 278 } 279 unmachedNodes.remove(action.getName()); 280 } 281 } 282 if (unmachedNodes.size() > 0) { 283 StringBuilder sb = new StringBuilder(); 284 String separator = ""; 285 for (String s : unmachedNodes) { 286 sb.append(separator).append(s); 287 separator = ","; 288 } 289 throw new CommandException(ErrorCode.E0807, sb); 290 } 291 } 292 293 /** 294 * Copys the variables for skipped nodes from the old wfInstance to new one. 295 * 296 * @param newWfInstance : Source WF instance object 297 * @param oldWfInstance : Update WF instance 298 */ 299 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) { 300 Map<String, String> oldVars = new HashMap<String, String>(); 301 Map<String, String> newVars = new HashMap<String, String>(); 302 oldVars = oldWfInstance.getAllVars(); 303 for (String var : oldVars.keySet()) { 304 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0]; 305 if (nodesToSkip.contains(actionName)) { 306 newVars.put(var, oldVars.get(var)); 307 } 308 } 309 for (String node : nodesToSkip) { 310 // Setting the TO_SKIP variable to true. This will be used by 311 // SignalCommand and LiteNodeHandler to skip the action. 312 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true"); 313 String visitedFlag = NodeHandler.getLoopFlag(node); 314 // Removing the visited flag so that the action won't be considered 315 // a loop. 316 if (newVars.containsKey(visitedFlag)) { 317 newVars.remove(visitedFlag); 318 } 319 } 320 newWfInstance.setAllVars(newVars); 321 } 322 323 /* (non-Javadoc) 324 * @see org.apache.oozie.command.XCommand#getEntityKey() 325 */ 326 @Override 327 public String getEntityKey() { 328 return this.jobId; 329 } 330 331 /* (non-Javadoc) 332 * @see org.apache.oozie.command.XCommand#isLockRequired() 333 */ 334 @Override 335 protected boolean isLockRequired() { 336 return true; 337 } 338 339 /* (non-Javadoc) 340 * @see org.apache.oozie.command.XCommand#loadState() 341 */ 342 @Override 343 protected void loadState() throws CommandException { 344 } 345 346 /* (non-Javadoc) 347 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 348 */ 349 @Override 350 protected void verifyPrecondition() throws CommandException, PreconditionException { 351 } 352 }