001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 import java.util.ArrayList; 024 import java.util.Collection; 025 import java.util.Date; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.oozie.ErrorCode; 036 import org.apache.oozie.WorkflowActionBean; 037 import org.apache.oozie.WorkflowJobBean; 038 import org.apache.oozie.client.OozieClient; 039 import org.apache.oozie.client.WorkflowAction; 040 import org.apache.oozie.client.WorkflowJob; 041 import org.apache.oozie.client.rest.JsonBean; 042 import org.apache.oozie.command.CommandException; 043 import org.apache.oozie.command.PreconditionException; 044 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor; 045 import org.apache.oozie.executor.jpa.JPAExecutorException; 046 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; 047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 048 import org.apache.oozie.service.DagXLogInfoService; 049 import org.apache.oozie.service.HadoopAccessorException; 050 import org.apache.oozie.service.HadoopAccessorService; 051 import org.apache.oozie.service.JPAService; 052 import org.apache.oozie.service.Services; 053 import org.apache.oozie.service.WorkflowAppService; 054 import org.apache.oozie.service.WorkflowStoreService; 055 import org.apache.oozie.util.ConfigUtils; 056 import org.apache.oozie.util.InstrumentUtils; 057 import org.apache.oozie.util.LogUtils; 058 import org.apache.oozie.util.ParamChecker; 059 import org.apache.oozie.util.PropertiesUtils; 060 import org.apache.oozie.util.XConfiguration; 061 import org.apache.oozie.util.XLog; 062 import org.apache.oozie.util.XmlUtils; 063 import org.apache.oozie.workflow.WorkflowApp; 064 import org.apache.oozie.workflow.WorkflowException; 065 import org.apache.oozie.workflow.WorkflowInstance; 066 import org.apache.oozie.workflow.WorkflowLib; 067 import org.apache.oozie.workflow.lite.NodeHandler; 068 069 /** 070 * This is a RerunXCommand which is used for rerunn. 071 * 072 */ 073 public class ReRunXCommand extends WorkflowXCommand<Void> { 074 private final String jobId; 075 private Configuration conf; 076 private final String authToken; 077 private final Set<String> nodesToSkip = new HashSet<String>(); 078 public static final String TO_SKIP = "TO_SKIP"; 079 private WorkflowJobBean wfBean; 080 private List<WorkflowActionBean> actions; 081 private JPAService jpaService; 082 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 083 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 084 085 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 086 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 087 088 static { 089 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 090 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 091 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 092 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 094 095 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 098 } 099 100 public ReRunXCommand(String jobId, Configuration conf, String authToken) { 101 super("rerun", "rerun", 1); 102 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 103 this.conf = ParamChecker.notNull(conf, "conf"); 104 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 105 } 106 107 /* (non-Javadoc) 108 * @see org.apache.oozie.command.XCommand#execute() 109 */ 110 @Override 111 protected Void execute() throws CommandException { 112 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 113 LogUtils.setLogInfo(wfBean, logInfo); 114 WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance(); 115 WorkflowInstance newWfInstance; 116 117 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 118 try { 119 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 120 WorkflowApp app = wps.parseDef(conf, authToken); 121 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true); 122 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 123 124 URI uri = new URI(conf.get(OozieClient.APP_PATH)); 125 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 126 Configuration fsConf = has.createJobConf(uri.getAuthority()); 127 FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf); 128 129 Path configDefault = null; 130 // app path could be a directory 131 Path path = new Path(uri.getPath()); 132 if (!fs.isFile(path)) { 133 configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT); 134 } else { 135 configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT); 136 } 137 138 if (fs.exists(configDefault)) { 139 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 140 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 141 XConfiguration.injectDefaults(defaultConf, conf); 142 } 143 144 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 145 146 // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved. 147 // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map 148 // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream 149 conf = ((XConfiguration) conf).resolve(); 150 151 try { 152 newWfInstance = workflowLib.createInstance(app, conf, jobId); 153 } 154 catch (WorkflowException e) { 155 throw new CommandException(e); 156 } 157 wfBean.setAppName(app.getName()); 158 wfBean.setProtoActionConf(protoActionConf.toXmlString()); 159 } 160 catch (WorkflowException ex) { 161 throw new CommandException(ex); 162 } 163 catch (IOException ex) { 164 throw new CommandException(ErrorCode.E0803, ex); 165 } 166 catch (HadoopAccessorException ex) { 167 throw new CommandException(ex); 168 } 169 catch (URISyntaxException ex) { 170 throw new CommandException(ErrorCode.E0711, ex.getMessage(), ex); 171 } 172 173 for (int i = 0; i < actions.size(); i++) { 174 if (!nodesToSkip.contains(actions.get(i).getName())) { 175 deleteList.add(actions.get(i)); 176 LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId()); 177 } 178 else { 179 copyActionData(newWfInstance, oldWfInstance); 180 } 181 } 182 183 wfBean.setAppPath(conf.get(OozieClient.APP_PATH)); 184 wfBean.setConf(XmlUtils.prettyPrint(conf).toString()); 185 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 186 wfBean.setUser(conf.get(OozieClient.USER_NAME)); 187 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 188 wfBean.setGroup(group); 189 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 190 wfBean.setEndTime(null); 191 wfBean.setRun(wfBean.getRun() + 1); 192 wfBean.setStatus(WorkflowJob.Status.PREP); 193 wfBean.setWorkflowInstance(newWfInstance); 194 195 try { 196 wfBean.setLastModifiedTime(new Date()); 197 updateList.add(wfBean); 198 // call JPAExecutor to do the bulk writes 199 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true)); 200 } 201 catch (JPAExecutorException je) { 202 throw new CommandException(je); 203 } 204 205 return null; 206 } 207 208 /** 209 * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the 210 * skipped node list 211 * 212 * @throws CommandException 213 */ 214 @Override 215 protected void eagerLoadState() throws CommandException { 216 super.eagerLoadState(); 217 try { 218 jpaService = Services.get().get(JPAService.class); 219 if (jpaService != null) { 220 this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId)); 221 this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId)); 222 } 223 else { 224 throw new CommandException(ErrorCode.E0610); 225 } 226 227 if (conf != null) { 228 if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes 229 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES); 230 for (String str : skipNodes) { 231 // trimming is required 232 nodesToSkip.add(str.trim()); 233 } 234 LOG.debug("Skipnode size :" + nodesToSkip.size()); 235 } 236 else { 237 for (WorkflowActionBean action : actions) { // Rerun from failed nodes 238 if (action.getStatus() == WorkflowAction.Status.OK) { 239 nodesToSkip.add(action.getName()); 240 } 241 } 242 LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size()); 243 } 244 StringBuilder tmp = new StringBuilder(); 245 for (String node : nodesToSkip) { 246 tmp.append(node).append(","); 247 } 248 LOG.debug("SkipNode List :" + tmp); 249 } 250 } 251 catch (Exception ex) { 252 throw new CommandException(ErrorCode.E0603, ex); 253 } 254 } 255 256 /** 257 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed - 258 * The nodes that are to be skipped are to be completed successfully in the base run. 259 * 260 * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions 261 */ 262 @Override 263 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 264 super.eagerVerifyPrecondition(); 265 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED) 266 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals( 267 WorkflowJob.Status.SUCCEEDED))) { 268 throw new CommandException(ErrorCode.E0805, wfBean.getStatus()); 269 } 270 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip); 271 for (WorkflowActionBean action : actions) { 272 if (nodesToSkip.contains(action.getName())) { 273 if (!action.getStatus().equals(WorkflowAction.Status.OK) 274 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) { 275 throw new CommandException(ErrorCode.E0806, action.getName()); 276 } 277 unmachedNodes.remove(action.getName()); 278 } 279 } 280 if (unmachedNodes.size() > 0) { 281 StringBuilder sb = new StringBuilder(); 282 String separator = ""; 283 for (String s : unmachedNodes) { 284 sb.append(separator).append(s); 285 separator = ","; 286 } 287 throw new CommandException(ErrorCode.E0807, sb); 288 } 289 } 290 291 /** 292 * Copys the variables for skipped nodes from the old wfInstance to new one. 293 * 294 * @param newWfInstance : Source WF instance object 295 * @param oldWfInstance : Update WF instance 296 */ 297 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) { 298 Map<String, String> oldVars = new HashMap<String, String>(); 299 Map<String, String> newVars = new HashMap<String, String>(); 300 oldVars = oldWfInstance.getAllVars(); 301 for (String var : oldVars.keySet()) { 302 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0]; 303 if (nodesToSkip.contains(actionName)) { 304 newVars.put(var, oldVars.get(var)); 305 } 306 } 307 for (String node : nodesToSkip) { 308 // Setting the TO_SKIP variable to true. This will be used by 309 // SignalCommand and LiteNodeHandler to skip the action. 310 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true"); 311 String visitedFlag = NodeHandler.getLoopFlag(node); 312 // Removing the visited flag so that the action won't be considered 313 // a loop. 314 if (newVars.containsKey(visitedFlag)) { 315 newVars.remove(visitedFlag); 316 } 317 } 318 newWfInstance.setAllVars(newVars); 319 } 320 321 /* (non-Javadoc) 322 * @see org.apache.oozie.command.XCommand#getEntityKey() 323 */ 324 @Override 325 public String getEntityKey() { 326 return this.jobId; 327 } 328 329 /* (non-Javadoc) 330 * @see org.apache.oozie.command.XCommand#isLockRequired() 331 */ 332 @Override 333 protected boolean isLockRequired() { 334 return true; 335 } 336 337 /* (non-Javadoc) 338 * @see org.apache.oozie.command.XCommand#loadState() 339 */ 340 @Override 341 protected void loadState() throws CommandException { 342 } 343 344 /* (non-Javadoc) 345 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 346 */ 347 @Override 348 protected void verifyPrecondition() throws CommandException, PreconditionException { 349 } 350 }