This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.util.ArrayList;
024 import java.util.Collection;
025 import java.util.Date;
026 import java.util.HashMap;
027 import java.util.HashSet;
028 import java.util.List;
029 import java.util.Map;
030 import java.util.Set;
031
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.Path;
035 import org.apache.oozie.ErrorCode;
036 import org.apache.oozie.WorkflowActionBean;
037 import org.apache.oozie.WorkflowJobBean;
038 import org.apache.oozie.client.OozieClient;
039 import org.apache.oozie.client.WorkflowAction;
040 import org.apache.oozie.client.WorkflowJob;
041 import org.apache.oozie.client.rest.JsonBean;
042 import org.apache.oozie.command.CommandException;
043 import org.apache.oozie.command.PreconditionException;
044 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
045 import org.apache.oozie.executor.jpa.JPAExecutorException;
046 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
048 import org.apache.oozie.service.DagXLogInfoService;
049 import org.apache.oozie.service.HadoopAccessorException;
050 import org.apache.oozie.service.HadoopAccessorService;
051 import org.apache.oozie.service.JPAService;
052 import org.apache.oozie.service.Services;
053 import org.apache.oozie.service.WorkflowAppService;
054 import org.apache.oozie.service.WorkflowStoreService;
055 import org.apache.oozie.util.ConfigUtils;
056 import org.apache.oozie.util.InstrumentUtils;
057 import org.apache.oozie.util.LogUtils;
058 import org.apache.oozie.util.ParamChecker;
059 import org.apache.oozie.util.PropertiesUtils;
060 import org.apache.oozie.util.XConfiguration;
061 import org.apache.oozie.util.XLog;
062 import org.apache.oozie.util.XmlUtils;
063 import org.apache.oozie.workflow.WorkflowApp;
064 import org.apache.oozie.workflow.WorkflowException;
065 import org.apache.oozie.workflow.WorkflowInstance;
066 import org.apache.oozie.workflow.WorkflowLib;
067 import org.apache.oozie.workflow.lite.NodeHandler;
068
069 /**
070 * This is a RerunXCommand which is used for rerunn.
071 *
072 */
073 public class ReRunXCommand extends WorkflowXCommand<Void> {
074 private final String jobId;
075 private Configuration conf;
076 private final String authToken;
077 private final Set<String> nodesToSkip = new HashSet<String>();
078 public static final String TO_SKIP = "TO_SKIP";
079 private WorkflowJobBean wfBean;
080 private List<WorkflowActionBean> actions;
081 private JPAService jpaService;
082 private List<JsonBean> updateList = new ArrayList<JsonBean>();
083 private List<JsonBean> deleteList = new ArrayList<JsonBean>();
084
085 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
086 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
087
088 static {
089 String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
090 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
091 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
092 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
094
095 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098 }
099
100 public ReRunXCommand(String jobId, Configuration conf, String authToken) {
101 super("rerun", "rerun", 1);
102 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
103 this.conf = ParamChecker.notNull(conf, "conf");
104 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
105 }
106
107 /* (non-Javadoc)
108 * @see org.apache.oozie.command.XCommand#execute()
109 */
110 @Override
111 protected Void execute() throws CommandException {
112 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
113 LogUtils.setLogInfo(wfBean, logInfo);
114 WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
115 WorkflowInstance newWfInstance;
116 String appPath = null;
117
118 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
119 try {
120 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
121 WorkflowApp app = wps.parseDef(conf, authToken);
122 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
123 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
124
125 appPath = conf.get(OozieClient.APP_PATH);
126 URI uri = new URI(appPath);
127 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
128 Configuration fsConf = has.createJobConf(uri.getAuthority());
129 FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf);
130
131 Path configDefault = null;
132 // app path could be a directory
133 Path path = new Path(uri.getPath());
134 if (!fs.isFile(path)) {
135 configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT);
136 } else {
137 configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT);
138 }
139
140 if (fs.exists(configDefault)) {
141 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
142 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
143 XConfiguration.injectDefaults(defaultConf, conf);
144 }
145
146 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
147
148 // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved.
149 // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map
150 // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream
151 conf = ((XConfiguration) conf).resolve();
152
153 try {
154 newWfInstance = workflowLib.createInstance(app, conf, jobId);
155 }
156 catch (WorkflowException e) {
157 throw new CommandException(e);
158 }
159 wfBean.setAppName(app.getName());
160 wfBean.setProtoActionConf(protoActionConf.toXmlString());
161 }
162 catch (WorkflowException ex) {
163 throw new CommandException(ex);
164 }
165 catch (IOException ex) {
166 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
167 }
168 catch (HadoopAccessorException ex) {
169 throw new CommandException(ex);
170 }
171 catch (URISyntaxException ex) {
172 throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
173 }
174
175 for (int i = 0; i < actions.size(); i++) {
176 if (!nodesToSkip.contains(actions.get(i).getName())) {
177 deleteList.add(actions.get(i));
178 LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
179 }
180 else {
181 copyActionData(newWfInstance, oldWfInstance);
182 }
183 }
184
185 wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
186 wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
187 wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
188 wfBean.setUser(conf.get(OozieClient.USER_NAME));
189 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
190 wfBean.setGroup(group);
191 wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
192 wfBean.setEndTime(null);
193 wfBean.setRun(wfBean.getRun() + 1);
194 wfBean.setStatus(WorkflowJob.Status.PREP);
195 wfBean.setWorkflowInstance(newWfInstance);
196
197 try {
198 wfBean.setLastModifiedTime(new Date());
199 updateList.add(wfBean);
200 // call JPAExecutor to do the bulk writes
201 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
202 }
203 catch (JPAExecutorException je) {
204 throw new CommandException(je);
205 }
206
207 return null;
208 }
209
210 /**
211 * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
212 * skipped node list
213 *
214 * @throws CommandException
215 */
216 @Override
217 protected void eagerLoadState() throws CommandException {
218 super.eagerLoadState();
219 try {
220 jpaService = Services.get().get(JPAService.class);
221 if (jpaService != null) {
222 this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId));
223 this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId));
224 }
225 else {
226 throw new CommandException(ErrorCode.E0610);
227 }
228
229 if (conf != null) {
230 if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
231 Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
232 for (String str : skipNodes) {
233 // trimming is required
234 nodesToSkip.add(str.trim());
235 }
236 LOG.debug("Skipnode size :" + nodesToSkip.size());
237 }
238 else {
239 for (WorkflowActionBean action : actions) { // Rerun from failed nodes
240 if (action.getStatus() == WorkflowAction.Status.OK) {
241 nodesToSkip.add(action.getName());
242 }
243 }
244 LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
245 }
246 StringBuilder tmp = new StringBuilder();
247 for (String node : nodesToSkip) {
248 tmp.append(node).append(",");
249 }
250 LOG.debug("SkipNode List :" + tmp);
251 }
252 }
253 catch (Exception ex) {
254 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
255 }
256 }
257
258 /**
259 * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
260 * The nodes that are to be skipped are to be completed successfully in the base run.
261 *
262 * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions
263 */
264 @Override
265 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
266 super.eagerVerifyPrecondition();
267 if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
268 || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
269 WorkflowJob.Status.SUCCEEDED))) {
270 throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
271 }
272 Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
273 for (WorkflowActionBean action : actions) {
274 if (nodesToSkip.contains(action.getName())) {
275 if (!action.getStatus().equals(WorkflowAction.Status.OK)
276 && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
277 throw new CommandException(ErrorCode.E0806, action.getName());
278 }
279 unmachedNodes.remove(action.getName());
280 }
281 }
282 if (unmachedNodes.size() > 0) {
283 StringBuilder sb = new StringBuilder();
284 String separator = "";
285 for (String s : unmachedNodes) {
286 sb.append(separator).append(s);
287 separator = ",";
288 }
289 throw new CommandException(ErrorCode.E0807, sb);
290 }
291 }
292
293 /**
294 * Copys the variables for skipped nodes from the old wfInstance to new one.
295 *
296 * @param newWfInstance : Source WF instance object
297 * @param oldWfInstance : Update WF instance
298 */
299 private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
300 Map<String, String> oldVars = new HashMap<String, String>();
301 Map<String, String> newVars = new HashMap<String, String>();
302 oldVars = oldWfInstance.getAllVars();
303 for (String var : oldVars.keySet()) {
304 String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
305 if (nodesToSkip.contains(actionName)) {
306 newVars.put(var, oldVars.get(var));
307 }
308 }
309 for (String node : nodesToSkip) {
310 // Setting the TO_SKIP variable to true. This will be used by
311 // SignalCommand and LiteNodeHandler to skip the action.
312 newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
313 String visitedFlag = NodeHandler.getLoopFlag(node);
314 // Removing the visited flag so that the action won't be considered
315 // a loop.
316 if (newVars.containsKey(visitedFlag)) {
317 newVars.remove(visitedFlag);
318 }
319 }
320 newWfInstance.setAllVars(newVars);
321 }
322
323 /* (non-Javadoc)
324 * @see org.apache.oozie.command.XCommand#getEntityKey()
325 */
326 @Override
327 public String getEntityKey() {
328 return this.jobId;
329 }
330
331 /* (non-Javadoc)
332 * @see org.apache.oozie.command.XCommand#isLockRequired()
333 */
334 @Override
335 protected boolean isLockRequired() {
336 return true;
337 }
338
339 /* (non-Javadoc)
340 * @see org.apache.oozie.command.XCommand#loadState()
341 */
342 @Override
343 protected void loadState() throws CommandException {
344 }
345
346 /* (non-Javadoc)
347 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
348 */
349 @Override
350 protected void verifyPrecondition() throws CommandException, PreconditionException {
351 }
352 }