This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.util.Collection;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.WorkflowActionBean;
035 import org.apache.oozie.WorkflowJobBean;
036 import org.apache.oozie.client.OozieClient;
037 import org.apache.oozie.client.WorkflowAction;
038 import org.apache.oozie.client.WorkflowJob;
039 import org.apache.oozie.command.CommandException;
040 import org.apache.oozie.command.PreconditionException;
041 import org.apache.oozie.executor.jpa.JPAExecutorException;
042 import org.apache.oozie.executor.jpa.WorkflowActionDeleteJPAExecutor;
043 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
044 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
046 import org.apache.oozie.service.DagXLogInfoService;
047 import org.apache.oozie.service.HadoopAccessorException;
048 import org.apache.oozie.service.HadoopAccessorService;
049 import org.apache.oozie.service.JPAService;
050 import org.apache.oozie.service.Services;
051 import org.apache.oozie.service.WorkflowAppService;
052 import org.apache.oozie.service.WorkflowStoreService;
053 import org.apache.oozie.util.InstrumentUtils;
054 import org.apache.oozie.util.LogUtils;
055 import org.apache.oozie.util.ParamChecker;
056 import org.apache.oozie.util.PropertiesUtils;
057 import org.apache.oozie.util.XConfiguration;
058 import org.apache.oozie.util.XLog;
059 import org.apache.oozie.util.XmlUtils;
060 import org.apache.oozie.workflow.WorkflowApp;
061 import org.apache.oozie.workflow.WorkflowException;
062 import org.apache.oozie.workflow.WorkflowInstance;
063 import org.apache.oozie.workflow.WorkflowLib;
064 import org.apache.oozie.workflow.lite.NodeHandler;
065
066 /**
067 * This is a RerunXCommand which is used for rerunn.
068 *
069 */
070 public class ReRunXCommand extends WorkflowXCommand<Void> {
071 private final String jobId;
072 private final Configuration conf;
073 private final String authToken;
074 private final Set<String> nodesToSkip = new HashSet<String>();
075 public static final String TO_SKIP = "TO_SKIP";
076 private WorkflowJobBean wfBean;
077 private List<WorkflowActionBean> actions;
078 private JPAService jpaService;
079
080 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
081 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
082
083 static {
084 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
085 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
086 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
087 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
088 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
089
090 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
091 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
092 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
093 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
094 }
095
096 public ReRunXCommand(String jobId, Configuration conf, String authToken) {
097 super("rerun", "rerun", 1);
098 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
099 this.conf = ParamChecker.notNull(conf, "conf");
100 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
101 }
102
103 /* (non-Javadoc)
104 * @see org.apache.oozie.command.XCommand#execute()
105 */
106 @Override
107 protected Void execute() throws CommandException {
108 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
109 LogUtils.setLogInfo(wfBean, logInfo);
110 WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
111 WorkflowInstance newWfInstance;
112
113 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
114 try {
115 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
116 WorkflowApp app = wps.parseDef(conf, authToken);
117 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
118 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
119
120 URI uri = new URI(conf.get(OozieClient.APP_PATH));
121 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(wfBean.getUser(),
122 wfBean.getGroup(), uri, new Configuration());
123
124 Path configDefault = null;
125 // app path could be a directory
126 Path path = new Path(uri.getPath());
127 if (!fs.isFile(path)) {
128 configDefault = new Path(path, SubmitCommand.CONFIG_DEFAULT);
129 } else {
130 configDefault = new Path(path.getParent(), SubmitCommand.CONFIG_DEFAULT);
131 }
132
133 if (fs.exists(configDefault)) {
134 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
135 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
136 XConfiguration.injectDefaults(defaultConf, conf);
137 }
138
139 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
140
141 try {
142 newWfInstance = workflowLib.createInstance(app, conf, jobId);
143 }
144 catch (WorkflowException e) {
145 throw new CommandException(e);
146 }
147 wfBean.setAppName(app.getName());
148 wfBean.setProtoActionConf(protoActionConf.toXmlString());
149 }
150 catch (WorkflowException ex) {
151 throw new CommandException(ex);
152 }
153 catch (IOException ex) {
154 throw new CommandException(ErrorCode.E0803, ex);
155 }
156 catch (HadoopAccessorException ex) {
157 throw new CommandException(ex);
158 }
159 catch (URISyntaxException ex) {
160 throw new CommandException(ErrorCode.E0711, ex.getMessage(), ex);
161 }
162
163 try {
164 for (int i = 0; i < actions.size(); i++) {
165 if (!nodesToSkip.contains(actions.get(i).getName())) {
166 jpaService.execute(new WorkflowActionDeleteJPAExecutor(actions.get(i).getId()));
167 LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
168 }
169 else {
170 copyActionData(newWfInstance, oldWfInstance);
171 }
172 }
173
174 wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
175 wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
176 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
177 wfBean.setUser(conf.get(OozieClient.USER_NAME));
178 wfBean.setGroup(conf.get(OozieClient.GROUP_NAME));
179 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
180 wfBean.setEndTime(null);
181 wfBean.setRun(wfBean.getRun() + 1);
182 wfBean.setStatus(WorkflowJob.Status.PREP);
183 wfBean.setWorkflowInstance(newWfInstance);
184 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
185 }
186 catch (JPAExecutorException e) {
187 throw new CommandException(e);
188 }
189
190 return null;
191 }
192
193 /**
194 * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
195 * skipped node list
196 *
197 * @throws CommandException
198 */
199 @Override
200 protected void eagerLoadState() throws CommandException {
201 super.eagerLoadState();
202 try {
203 jpaService = Services.get().get(JPAService.class);
204 if (jpaService != null) {
205 this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId));
206 this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId));
207 }
208 else {
209 throw new CommandException(ErrorCode.E0610);
210 }
211
212 if (conf != null) {
213 if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
214 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
215 for (String str : skipNodes) {
216 // trimming is required
217 nodesToSkip.add(str.trim());
218 }
219 LOG.debug("Skipnode size :" + nodesToSkip.size());
220 }
221 else {
222 for (WorkflowActionBean action : actions) { // Rerun from failed nodes
223 if (action.getStatus() == WorkflowAction.Status.OK) {
224 nodesToSkip.add(action.getName());
225 }
226 }
227 LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
228 }
229 StringBuilder tmp = new StringBuilder();
230 for (String node : nodesToSkip) {
231 tmp.append(node).append(",");
232 }
233 LOG.debug("SkipNode List :" + tmp);
234 }
235 }
236 catch (Exception ex) {
237 throw new CommandException(ErrorCode.E0603, ex);
238 }
239 }
240
241 /**
242 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
243 * The nodes that are to be skipped are to be completed successfully in the base run.
244 *
245 * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions
246 */
247 @Override
248 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
249 super.eagerVerifyPrecondition();
250 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
251 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
252 WorkflowJob.Status.SUCCEEDED))) {
253 throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
254 }
255 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
256 for (WorkflowActionBean action : actions) {
257 if (nodesToSkip.contains(action.getName())) {
258 if (!action.getStatus().equals(WorkflowAction.Status.OK)
259 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
260 throw new CommandException(ErrorCode.E0806, action.getName());
261 }
262 unmachedNodes.remove(action.getName());
263 }
264 }
265 if (unmachedNodes.size() > 0) {
266 StringBuilder sb = new StringBuilder();
267 String separator = "";
268 for (String s : unmachedNodes) {
269 sb.append(separator).append(s);
270 separator = ",";
271 }
272 throw new CommandException(ErrorCode.E0807, sb);
273 }
274 }
275
276 /**
277 * Copys the variables for skipped nodes from the old wfInstance to new one.
278 *
279 * @param newWfInstance : Source WF instance object
280 * @param oldWfInstance : Update WF instance
281 */
282 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
283 Map<String, String> oldVars = new HashMap<String, String>();
284 Map<String, String> newVars = new HashMap<String, String>();
285 oldVars = oldWfInstance.getAllVars();
286 for (String var : oldVars.keySet()) {
287 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
288 if (nodesToSkip.contains(actionName)) {
289 newVars.put(var, oldVars.get(var));
290 }
291 }
292 for (String node : nodesToSkip) {
293 // Setting the TO_SKIP variable to true. This will be used by
294 // SignalCommand and LiteNodeHandler to skip the action.
295 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
296 String visitedFlag = NodeHandler.getLoopFlag(node);
297 // Removing the visited flag so that the action won't be considered
298 // a loop.
299 if (newVars.containsKey(visitedFlag)) {
300 newVars.remove(visitedFlag);
301 }
302 }
303 newWfInstance.setAllVars(newVars);
304 }
305
306 /* (non-Javadoc)
307 * @see org.apache.oozie.command.XCommand#getEntityKey()
308 */
309 @Override
310 protected String getEntityKey() {
311 return this.jobId;
312 }
313
314 /* (non-Javadoc)
315 * @see org.apache.oozie.command.XCommand#isLockRequired()
316 */
317 @Override
318 protected boolean isLockRequired() {
319 return true;
320 }
321
322 /* (non-Javadoc)
323 * @see org.apache.oozie.command.XCommand#loadState()
324 */
325 @Override
326 protected void loadState() throws CommandException {
327 }
328
329 /* (non-Javadoc)
330 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
331 */
332 @Override
333 protected void verifyPrecondition() throws CommandException, PreconditionException {
334 }
335 }