This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.util.ArrayList;
024 import java.util.Collection;
025 import java.util.Date;
026 import java.util.HashMap;
027 import java.util.HashSet;
028 import java.util.List;
029 import java.util.Map;
030 import java.util.Set;
031
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.Path;
035 import org.apache.oozie.AppType;
036 import org.apache.oozie.ErrorCode;
037 import org.apache.oozie.WorkflowActionBean;
038 import org.apache.oozie.WorkflowJobBean;
039 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
040 import org.apache.oozie.client.OozieClient;
041 import org.apache.oozie.client.WorkflowAction;
042 import org.apache.oozie.client.WorkflowJob;
043 import org.apache.oozie.client.rest.JsonBean;
044 import org.apache.oozie.command.CommandException;
045 import org.apache.oozie.command.PreconditionException;
046 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
047 import org.apache.oozie.executor.jpa.JPAExecutorException;
048 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
049 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
050 import org.apache.oozie.service.DagXLogInfoService;
051 import org.apache.oozie.service.HadoopAccessorException;
052 import org.apache.oozie.service.HadoopAccessorService;
053 import org.apache.oozie.service.JPAService;
054 import org.apache.oozie.service.Services;
055 import org.apache.oozie.service.UUIDService;
056 import org.apache.oozie.service.WorkflowAppService;
057 import org.apache.oozie.service.WorkflowStoreService;
058 import org.apache.oozie.sla.SLAOperations;
059 import org.apache.oozie.sla.service.SLAService;
060 import org.apache.oozie.util.ConfigUtils;
061 import org.apache.oozie.util.ELEvaluator;
062 import org.apache.oozie.util.ELUtils;
063 import org.apache.oozie.util.InstrumentUtils;
064 import org.apache.oozie.util.LogUtils;
065 import org.apache.oozie.util.ParamChecker;
066 import org.apache.oozie.util.PropertiesUtils;
067 import org.apache.oozie.util.XConfiguration;
068 import org.apache.oozie.util.XLog;
069 import org.apache.oozie.util.XmlUtils;
070 import org.apache.oozie.workflow.WorkflowApp;
071 import org.apache.oozie.workflow.WorkflowException;
072 import org.apache.oozie.workflow.WorkflowInstance;
073 import org.apache.oozie.workflow.WorkflowLib;
074 import org.apache.oozie.workflow.lite.NodeHandler;
075 import org.jdom.Element;
076 import org.jdom.JDOMException;
077
078 /**
079 * This is a RerunXCommand which is used for rerunn.
080 *
081 */
082 public class ReRunXCommand extends WorkflowXCommand<Void> {
083 private final String jobId;
084 private Configuration conf;
085 private final Set<String> nodesToSkip = new HashSet<String>();
086 public static final String TO_SKIP = "TO_SKIP";
087 private WorkflowJobBean wfBean;
088 private List<WorkflowActionBean> actions;
089 private JPAService jpaService;
090 private List<JsonBean> updateList = new ArrayList<JsonBean>();
091 private List<JsonBean> deleteList = new ArrayList<JsonBean>();
092
093 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
094 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
095
096 static {
097 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
098 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
099 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
100 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
101 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
102
103 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
104 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
105 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
106 }
107
108 public ReRunXCommand(String jobId, Configuration conf) {
109 super("rerun", "rerun", 1);
110 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
111 this.conf = ParamChecker.notNull(conf, "conf");
112 }
113
114 /* (non-Javadoc)
115 * @see org.apache.oozie.command.XCommand#execute()
116 */
117 @Override
118 protected Void execute() throws CommandException {
119 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
120 LogUtils.setLogInfo(wfBean, logInfo);
121 WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
122 WorkflowInstance newWfInstance;
123 String appPath = null;
124
125 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
126 try {
127 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
128 WorkflowApp app = wps.parseDef(conf);
129 XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
130 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
131
132 appPath = conf.get(OozieClient.APP_PATH);
133 URI uri = new URI(appPath);
134 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
135 Configuration fsConf = has.createJobConf(uri.getAuthority());
136 FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf);
137
138 Path configDefault = null;
139 // app path could be a directory
140 Path path = new Path(uri.getPath());
141 if (!fs.isFile(path)) {
142 configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT);
143 } else {
144 configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT);
145 }
146
147 if (fs.exists(configDefault)) {
148 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
149 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
150 XConfiguration.injectDefaults(defaultConf, conf);
151 }
152
153 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
154
155 // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved.
156 // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map
157 // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream
158 conf = ((XConfiguration) conf).resolve();
159
160 try {
161 newWfInstance = workflowLib.createInstance(app, conf, jobId);
162 }
163 catch (WorkflowException e) {
164 throw new CommandException(e);
165 }
166
167 if (SLAService.isEnabled()) {
168 Element wfElem = XmlUtils.parseXml(app.getDefinition());
169 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
170 Element eSla = XmlUtils.getSLAElement(wfElem);
171 String jobSlaXml = null;
172 if (eSla != null) {
173 jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla);
174 }
175 String appName = ELUtils.resolveAppName(app.getName(), conf);
176 writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(),
177 conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName,
178 evalSla);
179 }
180 wfBean.setAppName(app.getName());
181 wfBean.setProtoActionConf(protoActionConf.toXmlString());
182 }
183 catch (WorkflowException ex) {
184 throw new CommandException(ex);
185 }
186 catch (IOException ex) {
187 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
188 }
189 catch (HadoopAccessorException ex) {
190 throw new CommandException(ex);
191 }
192 catch (URISyntaxException ex) {
193 throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
194 }
195 catch (Exception ex) {
196 throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex);
197 }
198
199 for (int i = 0; i < actions.size(); i++) {
200 if (!nodesToSkip.contains(actions.get(i).getName())) {
201 deleteList.add(actions.get(i));
202 LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
203 }
204 else {
205 copyActionData(newWfInstance, oldWfInstance);
206 }
207 }
208
209 wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
210 wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
211 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
212 wfBean.setUser(conf.get(OozieClient.USER_NAME));
213 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
214 wfBean.setGroup(group);
215 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
216 wfBean.setEndTime(null);
217 wfBean.setRun(wfBean.getRun() + 1);
218 wfBean.setStatus(WorkflowJob.Status.PREP);
219 wfBean.setWorkflowInstance(newWfInstance);
220
221 try {
222 wfBean.setLastModifiedTime(new Date());
223 updateList.add(wfBean);
224 // call JPAExecutor to do the bulk writes
225 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
226 }
227 catch (JPAExecutorException je) {
228 throw new CommandException(je);
229 }
230
231 return null;
232 }
233
234
235 @SuppressWarnings("unchecked")
236 private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user,
237 String appName, ELEvaluator evalSla) throws JDOMException, CommandException {
238 if (jobSlaXml != null && jobSlaXml.length() > 0) {
239 Element eSla = XmlUtils.parseXml(jobSlaXml);
240 // insert into new table
241 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG,
242 true);
243 }
244 // Add sla for wf actions
245 for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) {
246 Element actionSla = XmlUtils.getSLAElement(action);
247 if (actionSla != null) {
248 String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
249 actionSla = XmlUtils.parseXml(actionSlaXml);
250 if (!nodesToSkip.contains(action.getAttributeValue("name"))) {
251 String actionId = Services.get().get(UUIDService.class)
252 .generateChildId(jobId, action.getAttributeValue("name") + "");
253 SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user,
254 appName, LOG, true);
255 }
256 }
257 }
258
259 }
260
261 /**
262 * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
263 * skipped node list
264 *
265 * @throws CommandException
266 */
267 @Override
268 protected void eagerLoadState() throws CommandException {
269 super.eagerLoadState();
270 try {
271 jpaService = Services.get().get(JPAService.class);
272 if (jpaService != null) {
273 this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId));
274 this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId));
275 }
276 else {
277 throw new CommandException(ErrorCode.E0610);
278 }
279
280 if (conf != null) {
281 if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
282 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
283 for (String str : skipNodes) {
284 // trimming is required
285 nodesToSkip.add(str.trim());
286 }
287 LOG.debug("Skipnode size :" + nodesToSkip.size());
288 }
289 else {
290 for (WorkflowActionBean action : actions) { // Rerun from failed nodes
291 if (action.getStatus() == WorkflowAction.Status.OK) {
292 nodesToSkip.add(action.getName());
293 }
294 }
295 LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
296 }
297 StringBuilder tmp = new StringBuilder();
298 for (String node : nodesToSkip) {
299 tmp.append(node).append(",");
300 }
301 LOG.debug("SkipNode List :" + tmp);
302 }
303 }
304 catch (Exception ex) {
305 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
306 }
307 }
308
309 /**
310 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
311 * The nodes that are to be skipped are to be completed successfully in the base run.
312 *
313 * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions
314 */
315 @Override
316 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
317 super.eagerVerifyPrecondition();
318 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
319 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
320 WorkflowJob.Status.SUCCEEDED))) {
321 throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
322 }
323 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
324 for (WorkflowActionBean action : actions) {
325 if (nodesToSkip.contains(action.getName())) {
326 if (!action.getStatus().equals(WorkflowAction.Status.OK)
327 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
328 throw new CommandException(ErrorCode.E0806, action.getName());
329 }
330 unmachedNodes.remove(action.getName());
331 }
332 }
333 if (unmachedNodes.size() > 0) {
334 StringBuilder sb = new StringBuilder();
335 String separator = "";
336 for (String s : unmachedNodes) {
337 sb.append(separator).append(s);
338 separator = ",";
339 }
340 throw new CommandException(ErrorCode.E0807, sb);
341 }
342 }
343
344 /**
345 * Copys the variables for skipped nodes from the old wfInstance to new one.
346 *
347 * @param newWfInstance : Source WF instance object
348 * @param oldWfInstance : Update WF instance
349 */
350 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
351 Map<String, String> oldVars = new HashMap<String, String>();
352 Map<String, String> newVars = new HashMap<String, String>();
353 oldVars = oldWfInstance.getAllVars();
354 for (String var : oldVars.keySet()) {
355 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
356 if (nodesToSkip.contains(actionName)) {
357 newVars.put(var, oldVars.get(var));
358 }
359 }
360 for (String node : nodesToSkip) {
361 // Setting the TO_SKIP variable to true. This will be used by
362 // SignalCommand and LiteNodeHandler to skip the action.
363 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
364 String visitedFlag = NodeHandler.getLoopFlag(node);
365 // Removing the visited flag so that the action won't be considered
366 // a loop.
367 if (newVars.containsKey(visitedFlag)) {
368 newVars.remove(visitedFlag);
369 }
370 }
371 newWfInstance.setAllVars(newVars);
372 }
373
374 /* (non-Javadoc)
375 * @see org.apache.oozie.command.XCommand#getEntityKey()
376 */
377 @Override
378 public String getEntityKey() {
379 return this.jobId;
380 }
381
382 /* (non-Javadoc)
383 * @see org.apache.oozie.command.XCommand#isLockRequired()
384 */
385 @Override
386 protected boolean isLockRequired() {
387 return true;
388 }
389
390 /* (non-Javadoc)
391 * @see org.apache.oozie.command.XCommand#loadState()
392 */
393 @Override
394 protected void loadState() throws CommandException {
395 eagerLoadState();
396 }
397
398 /* (non-Javadoc)
399 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
400 */
401 @Override
402 protected void verifyPrecondition() throws CommandException, PreconditionException {
403 eagerVerifyPrecondition();
404 }
405 }