001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.HashSet; 021 import java.util.Set; 022 import java.io.IOException; 023 import java.util.Collection; 024 import java.util.HashMap; 025 import java.util.List; 026 import java.util.Map; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.oozie.client.WorkflowAction; 032 import org.apache.oozie.client.WorkflowJob; 033 import org.apache.oozie.client.OozieClient; 034 import org.apache.oozie.WorkflowActionBean; 035 import org.apache.oozie.WorkflowJobBean; 036 import org.apache.oozie.ErrorCode; 037 import org.apache.oozie.service.HadoopAccessorException; 038 import org.apache.oozie.service.WorkflowAppService; 039 import org.apache.oozie.service.Services; 040 import org.apache.oozie.service.DagXLogInfoService; 041 import org.apache.oozie.service.WorkflowStoreService; 042 import org.apache.oozie.service.HadoopAccessorService; 043 import org.apache.oozie.util.ParamChecker; 044 import org.apache.oozie.util.PropertiesUtils; 045 import org.apache.oozie.util.XLog; 046 import org.apache.oozie.util.XConfiguration; 047 import org.apache.oozie.util.XmlUtils; 048 import org.apache.oozie.command.Command; 049 import org.apache.oozie.command.CommandException; 050 import org.apache.oozie.store.StoreException; 051 import org.apache.oozie.store.WorkflowStore; 052 import org.apache.oozie.workflow.WorkflowApp; 053 import org.apache.oozie.workflow.WorkflowException; 054 import org.apache.oozie.workflow.WorkflowInstance; 055 import org.apache.oozie.workflow.WorkflowLib; 056 import org.apache.oozie.workflow.lite.NodeHandler; 057 058 public class ReRunCommand extends WorkflowCommand<Void> { 059 060 private String jobId; 061 private Configuration conf; 062 private String authToken; 063 private Set<String> nodesToSkip = new HashSet<String>(); 064 public static final String TO_SKIP = "TO_SKIP"; 065 066 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 067 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 068 069 static { 070 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 071 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 072 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 073 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 074 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 075 076 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 077 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME}; 078 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 079 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 080 } 081 082 public ReRunCommand(String jobId, Configuration conf, String authToken) { 083 super("rerun", "rerun", 1, XLog.STD); 084 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 085 this.conf = ParamChecker.notNull(conf, "conf"); 086 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 087 } 088 089 /** 090 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed - 091 * The nodes that are to be skipped are to be completed successfully in the base run. 092 * 093 * @param wfBean Workflow bean 094 * @param actions List of actions of Workflow 095 * @throws org.apache.oozie.command.CommandException On failure of pre-conditions 096 */ 097 private void checkPreConditions(WorkflowJobBean wfBean, List<WorkflowActionBean> actions) throws CommandException { 098 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED) 099 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals( 100 WorkflowJob.Status.SUCCEEDED))) { 101 throw new CommandException(ErrorCode.E0805, wfBean.getStatus()); 102 } 103 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip); 104 for (WorkflowActionBean action : actions) { 105 if (nodesToSkip.contains(action.getName())) { 106 if (!action.getStatus().equals(WorkflowAction.Status.OK) 107 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) { 108 throw new CommandException(ErrorCode.E0806, action.getName()); 109 } 110 unmachedNodes.remove(action.getName()); 111 } 112 } 113 if (unmachedNodes.size() > 0) { 114 StringBuilder sb = new StringBuilder(); 115 String separator = ""; 116 for (String s : unmachedNodes) { 117 sb.append(separator).append(s); 118 separator = ","; 119 } 120 throw new CommandException(ErrorCode.E0807, sb); 121 } 122 } 123 124 /** 125 * Parses the config and adds the nodes that are to be skipped to the skipped node list 126 */ 127 private void parseSkippedNodeConf() { 128 if (conf != null) { 129 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES); 130 for (String str : skipNodes) { 131 // trimming is required 132 nodesToSkip.add(str.trim()); 133 } 134 } 135 } 136 137 protected Void call(WorkflowStore store) throws StoreException, CommandException { 138 incrJobCounter(1); 139 WorkflowJobBean wfBean = store.getWorkflow(jobId, false); 140 setLogInfo(wfBean); 141 List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId, false); 142 WorkflowInstance oldWfInstance = wfBean.getWorkflowInstance(); 143 WorkflowInstance newWfInstance; 144 XLog log = XLog.getLog(getClass()); 145 parseSkippedNodeConf(); 146 checkPreConditions(wfBean, actions); 147 148 WorkflowAppService wps = Services.get().get(WorkflowAppService.class); 149 try { 150 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 151 WorkflowApp app = wps.parseDef(conf, authToken); 152 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true); 153 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); 154 155 Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(), SubmitCommand.CONFIG_DEFAULT); 156 FileSystem fs = Services.get().get(HadoopAccessorService.class). 157 createFileSystem(wfBean.getUser(), wfBean.getGroup(), configDefault.toUri(), protoActionConf); 158 159 if (fs.exists(configDefault)) { 160 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 161 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 162 XConfiguration.injectDefaults(defaultConf, conf); 163 } 164 165 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 166 167 try { 168 newWfInstance = workflowLib.createInstance(app, conf, jobId); 169 } 170 catch (WorkflowException e) { 171 throw new StoreException(e); 172 } 173 wfBean.setAppName(app.getName()); 174 wfBean.setProtoActionConf(protoActionConf.toXmlString()); 175 } 176 catch (WorkflowException ex) { 177 throw new CommandException(ex); 178 } 179 catch (IOException ex) { 180 throw new CommandException(ErrorCode.E0803, ex); 181 } 182 catch (HadoopAccessorException e) { 183 throw new CommandException(e); 184 } 185 186 for (int i = 0; i < actions.size(); i++) { 187 if (!nodesToSkip.contains(actions.get(i).getName())) { 188 store.deleteAction(actions.get(i).getId()); 189 log.info("Deleting Action[{0}] for re-run", actions.get(i).getId()); 190 } 191 else { 192 copyActionData(newWfInstance, oldWfInstance); 193 } 194 } 195 196 wfBean.setAppPath(conf.get(OozieClient.APP_PATH)); 197 wfBean.setConf(XmlUtils.prettyPrint(conf).toString()); 198 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, "")); 199 wfBean.setUser(conf.get(OozieClient.USER_NAME)); 200 wfBean.setGroup(conf.get(OozieClient.GROUP_NAME)); 201 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); 202 wfBean.setEndTime(null); 203 wfBean.setRun(wfBean.getRun() + 1); 204 wfBean.setStatus(WorkflowJob.Status.PREP); 205 wfBean.setWorkflowInstance(newWfInstance); 206 store.updateWorkflow(wfBean); 207 return null; 208 } 209 210 /** 211 * Copys the variables for skipped nodes from the old wfInstance to new one. 212 * 213 * @param newWfInstance 214 * @param oldWfInstance 215 */ 216 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) { 217 Map<String, String> oldVars = new HashMap<String, String>(); 218 Map<String, String> newVars = new HashMap<String, String>(); 219 oldVars = oldWfInstance.getAllVars(); 220 for (String var : oldVars.keySet()) { 221 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0]; 222 if (nodesToSkip.contains(actionName)) { 223 newVars.put(var, oldVars.get(var)); 224 } 225 } 226 for (String node : nodesToSkip) { 227 // Setting the TO_SKIP variable to true. This will be used by 228 // SignalCommand and LiteNodeHandler to skip the action. 229 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true"); 230 String visitedFlag = NodeHandler.getLoopFlag(node); 231 // Removing the visited flag so that the action won't be considered 232 // a loop. 233 if (newVars.containsKey(visitedFlag)) { 234 newVars.remove(visitedFlag); 235 } 236 } 237 newWfInstance.setAllVars(newVars); 238 } 239 240 @Override 241 protected Void execute(WorkflowStore store) throws CommandException, StoreException { 242 try { 243 XLog.getLog(getClass()).debug("STARTED ReRunCommand for job " + jobId); 244 if (lock(jobId)) { 245 call(store); 246 } 247 else { 248 queueCallable(new ReRunCommand(jobId, conf, authToken), LOCK_FAILURE_REQUEUE_INTERVAL); 249 XLog.getLog(getClass()).warn("ReRunCommand lock was not acquired - failed {0}", jobId); 250 } 251 } 252 catch (InterruptedException e) { 253 queueCallable(new ReRunCommand(jobId, conf, authToken), LOCK_FAILURE_REQUEUE_INTERVAL); 254 XLog.getLog(getClass()) 255 .warn("ReRunCommand lock was not acquired - interrupted exception failed {0}", jobId); 256 } 257 XLog.getLog(getClass()).debug("ENDED ReRunCommand for job " + jobId); 258 return null; 259 } 260 }