This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.HashSet;
021 import java.util.Set;
022 import java.io.IOException;
023 import java.util.Collection;
024 import java.util.HashMap;
025 import java.util.List;
026 import java.util.Map;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.oozie.client.WorkflowAction;
032 import org.apache.oozie.client.WorkflowJob;
033 import org.apache.oozie.client.OozieClient;
034 import org.apache.oozie.WorkflowActionBean;
035 import org.apache.oozie.WorkflowJobBean;
036 import org.apache.oozie.ErrorCode;
037 import org.apache.oozie.service.HadoopAccessorException;
038 import org.apache.oozie.service.WorkflowAppService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.service.DagXLogInfoService;
041 import org.apache.oozie.service.WorkflowStoreService;
042 import org.apache.oozie.service.HadoopAccessorService;
043 import org.apache.oozie.util.ParamChecker;
044 import org.apache.oozie.util.PropertiesUtils;
045 import org.apache.oozie.util.XLog;
046 import org.apache.oozie.util.XConfiguration;
047 import org.apache.oozie.util.XmlUtils;
048 import org.apache.oozie.command.Command;
049 import org.apache.oozie.command.CommandException;
050 import org.apache.oozie.store.StoreException;
051 import org.apache.oozie.store.WorkflowStore;
052 import org.apache.oozie.workflow.WorkflowApp;
053 import org.apache.oozie.workflow.WorkflowException;
054 import org.apache.oozie.workflow.WorkflowInstance;
055 import org.apache.oozie.workflow.WorkflowLib;
056 import org.apache.oozie.workflow.lite.NodeHandler;
057
058 public class ReRunCommand extends WorkflowCommand<Void> {
059
060 private String jobId;
061 private Configuration conf;
062 private String authToken;
063 private Set<String> nodesToSkip = new HashSet<String>();
064 public static final String TO_SKIP = "TO_SKIP";
065
066 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
067 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
068
069 static {
070 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
071 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
072 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
073 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
074 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
075
076 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
077 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
078 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
079 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
080 }
081
082 public ReRunCommand(String jobId, Configuration conf, String authToken) {
083 super("rerun", "rerun", 1, XLog.STD);
084 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
085 this.conf = ParamChecker.notNull(conf, "conf");
086 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
087 }
088
089 /**
090 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
091 * The nodes that are to be skipped are to be completed successfully in the base run.
092 *
093 * @param wfBean Workflow bean
094 * @param actions List of actions of Workflow
095 * @throws org.apache.oozie.command.CommandException On failure of pre-conditions
096 */
097 private void checkPreConditions(WorkflowJobBean wfBean, List<WorkflowActionBean> actions) throws CommandException {
098 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
099 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
100 WorkflowJob.Status.SUCCEEDED))) {
101 throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
102 }
103 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
104 for (WorkflowActionBean action : actions) {
105 if (nodesToSkip.contains(action.getName())) {
106 if (!action.getStatus().equals(WorkflowAction.Status.OK)
107 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
108 throw new CommandException(ErrorCode.E0806, action.getName());
109 }
110 unmachedNodes.remove(action.getName());
111 }
112 }
113 if (unmachedNodes.size() > 0) {
114 StringBuilder sb = new StringBuilder();
115 String separator = "";
116 for (String s : unmachedNodes) {
117 sb.append(separator).append(s);
118 separator = ",";
119 }
120 throw new CommandException(ErrorCode.E0807, sb);
121 }
122 }
123
124 /**
125 * Parses the config and adds the nodes that are to be skipped to the skipped node list
126 */
127 private void parseSkippedNodeConf() {
128 if (conf != null) {
129 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
130 for (String str : skipNodes) {
131 // trimming is required
132 nodesToSkip.add(str.trim());
133 }
134 }
135 }
136
137 protected Void call(WorkflowStore store) throws StoreException, CommandException {
138 incrJobCounter(1);
139 WorkflowJobBean wfBean = store.getWorkflow(jobId, false);
140 setLogInfo(wfBean);
141 List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId, false);
142 WorkflowInstance oldWfInstance = wfBean.getWorkflowInstance();
143 WorkflowInstance newWfInstance;
144 XLog log = XLog.getLog(getClass());
145 parseSkippedNodeConf();
146 checkPreConditions(wfBean, actions);
147
148 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
149 try {
150 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
151 WorkflowApp app = wps.parseDef(conf, authToken);
152 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
153 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
154
155 Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(), SubmitCommand.CONFIG_DEFAULT);
156 FileSystem fs = Services.get().get(HadoopAccessorService.class).
157 createFileSystem(wfBean.getUser(), wfBean.getGroup(), configDefault.toUri(), protoActionConf);
158
159 if (fs.exists(configDefault)) {
160 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
161 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
162 XConfiguration.injectDefaults(defaultConf, conf);
163 }
164
165 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
166
167 try {
168 newWfInstance = workflowLib.createInstance(app, conf, jobId);
169 }
170 catch (WorkflowException e) {
171 throw new StoreException(e);
172 }
173 wfBean.setAppName(app.getName());
174 wfBean.setProtoActionConf(protoActionConf.toXmlString());
175 }
176 catch (WorkflowException ex) {
177 throw new CommandException(ex);
178 }
179 catch (IOException ex) {
180 throw new CommandException(ErrorCode.E0803, ex);
181 }
182 catch (HadoopAccessorException e) {
183 throw new CommandException(e);
184 }
185
186 for (int i = 0; i < actions.size(); i++) {
187 if (!nodesToSkip.contains(actions.get(i).getName())) {
188 store.deleteAction(actions.get(i).getId());
189 log.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
190 }
191 else {
192 copyActionData(newWfInstance, oldWfInstance);
193 }
194 }
195
196 wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
197 wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
198 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
199 wfBean.setUser(conf.get(OozieClient.USER_NAME));
200 wfBean.setGroup(conf.get(OozieClient.GROUP_NAME));
201 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
202 wfBean.setEndTime(null);
203 wfBean.setRun(wfBean.getRun() + 1);
204 wfBean.setStatus(WorkflowJob.Status.PREP);
205 wfBean.setWorkflowInstance(newWfInstance);
206 store.updateWorkflow(wfBean);
207 return null;
208 }
209
210 /**
211 * Copys the variables for skipped nodes from the old wfInstance to new one.
212 *
213 * @param newWfInstance
214 * @param oldWfInstance
215 */
216 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
217 Map<String, String> oldVars = new HashMap<String, String>();
218 Map<String, String> newVars = new HashMap<String, String>();
219 oldVars = oldWfInstance.getAllVars();
220 for (String var : oldVars.keySet()) {
221 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
222 if (nodesToSkip.contains(actionName)) {
223 newVars.put(var, oldVars.get(var));
224 }
225 }
226 for (String node : nodesToSkip) {
227 // Setting the TO_SKIP variable to true. This will be used by
228 // SignalCommand and LiteNodeHandler to skip the action.
229 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
230 String visitedFlag = NodeHandler.getLoopFlag(node);
231 // Removing the visited flag so that the action won't be considered
232 // a loop.
233 if (newVars.containsKey(visitedFlag)) {
234 newVars.remove(visitedFlag);
235 }
236 }
237 newWfInstance.setAllVars(newVars);
238 }
239
240 @Override
241 protected Void execute(WorkflowStore store) throws CommandException, StoreException {
242 try {
243 XLog.getLog(getClass()).debug("STARTED ReRunCommand for job " + jobId);
244 if (lock(jobId)) {
245 call(store);
246 }
247 else {
248 queueCallable(new ReRunCommand(jobId, conf, authToken), LOCK_FAILURE_REQUEUE_INTERVAL);
249 XLog.getLog(getClass()).warn("ReRunCommand lock was not acquired - failed {0}", jobId);
250 }
251 }
252 catch (InterruptedException e) {
253 queueCallable(new ReRunCommand(jobId, conf, authToken), LOCK_FAILURE_REQUEUE_INTERVAL);
254 XLog.getLog(getClass())
255 .warn("ReRunCommand lock was not acquired - interrupted exception failed {0}", jobId);
256 }
257 XLog.getLog(getClass()).debug("ENDED ReRunCommand for job " + jobId);
258 return null;
259 }
260 }