001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Date;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.oozie.AppType;
037import org.apache.oozie.ErrorCode;
038import org.apache.oozie.WorkflowActionBean;
039import org.apache.oozie.WorkflowJobBean;
040import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
041import org.apache.oozie.client.OozieClient;
042import org.apache.oozie.client.WorkflowAction;
043import org.apache.oozie.client.WorkflowJob;
044import org.apache.oozie.client.rest.JsonBean;
045import org.apache.oozie.command.CommandException;
046import org.apache.oozie.command.PreconditionException;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
049import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
050import org.apache.oozie.executor.jpa.BatchQueryExecutor;
051import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
052import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
053import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
054import org.apache.oozie.service.ConfigurationService;
055import org.apache.oozie.service.DagXLogInfoService;
056import org.apache.oozie.service.HadoopAccessorException;
057import org.apache.oozie.service.HadoopAccessorService;
058import org.apache.oozie.service.Services;
059import org.apache.oozie.service.UUIDService;
060import org.apache.oozie.service.WorkflowAppService;
061import org.apache.oozie.service.WorkflowStoreService;
062import org.apache.oozie.sla.SLAOperations;
063import org.apache.oozie.sla.service.SLAService;
064import org.apache.oozie.util.ConfigUtils;
065import org.apache.oozie.util.ELEvaluator;
066import org.apache.oozie.util.ELUtils;
067import org.apache.oozie.util.InstrumentUtils;
068import org.apache.oozie.util.LogUtils;
069import org.apache.oozie.util.ParamChecker;
070import org.apache.oozie.util.PropertiesUtils;
071import org.apache.oozie.util.XConfiguration;
072import org.apache.oozie.util.XLog;
073import org.apache.oozie.util.XmlUtils;
074import org.apache.oozie.workflow.WorkflowApp;
075import org.apache.oozie.workflow.WorkflowException;
076import org.apache.oozie.workflow.WorkflowInstance;
077import org.apache.oozie.workflow.WorkflowLib;
078import org.apache.oozie.workflow.lite.NodeHandler;
079import org.jdom.Element;
080import org.jdom.JDOMException;
081
082/**
083 * This is a RerunXCommand which is used for rerunn.
084 *
085 */
086public class ReRunXCommand extends WorkflowXCommand<Void> {
087    private final String jobId;
088    private Configuration conf;
089    private final Set<String> nodesToSkip = new HashSet<String>();
090    public static final String TO_SKIP = "TO_SKIP";
091    private WorkflowJobBean wfBean;
092    private List<WorkflowActionBean> actions;
093    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
094    private List<JsonBean> deleteList = new ArrayList<JsonBean>();
095
096    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
097    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
098    public static final String DISABLE_CHILD_RERUN = "oozie.wf.rerun.disablechild";
099
100    static {
101        String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
102                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
103                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
104                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
105        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
106
107        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
108        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
109        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
110    }
111
112    public ReRunXCommand(String jobId, Configuration conf) {
113        super("rerun", "rerun", 1);
114        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
115        this.conf = ParamChecker.notNull(conf, "conf");
116    }
117
118    @Override
119    protected void setLogInfo() {
120        LogUtils.setLogInfo(jobId);
121    }
122
123    /* (non-Javadoc)
124     * @see org.apache.oozie.command.XCommand#execute()
125     */
126    @Override
127    protected Void execute() throws CommandException {
128        setupReRun();
129        startWorkflow(jobId);
130        return null;
131    }
132
133    private void startWorkflow(String jobId) throws CommandException {
134        new StartXCommand(jobId).call();
135    }
136
137    private void setupReRun() throws CommandException {
138        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
139        LogUtils.setLogInfo(wfBean);
140        WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
141        WorkflowInstance newWfInstance;
142        String appPath = null;
143
144        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
145        try {
146            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
147            WorkflowApp app = wps.parseDef(conf, null);
148            XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
149            WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
150
151            appPath = conf.get(OozieClient.APP_PATH);
152            URI uri = new URI(appPath);
153            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
154            Configuration fsConf = has.createJobConf(uri.getAuthority());
155            FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf);
156
157            Path configDefault = null;
158            // app path could be a directory
159            Path path = new Path(uri.getPath());
160            if (!fs.isFile(path)) {
161                configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT);
162            }
163            else {
164                configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT);
165            }
166
167            if (fs.exists(configDefault)) {
168                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
169                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
170                XConfiguration.injectDefaults(defaultConf, conf);
171            }
172
173            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
174
175            // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are
176            // preserved. The Configuration.get function within XConfiguration.resolve() works recursively to get the
177            // final value corresponding to a key in the map Resetting the conf to contain all the resolved values is
178            // necessary to ensure propagation of Oozie properties to Hadoop calls downstream
179            conf = ((XConfiguration) conf).resolve();
180
181            try {
182                newWfInstance = workflowLib.createInstance(app, conf, jobId);
183            }
184            catch (WorkflowException e) {
185                throw new CommandException(e);
186            }
187            String appName = ELUtils.resolveAppName(app.getName(), conf);
188            if (SLAService.isEnabled()) {
189                Element wfElem = XmlUtils.parseXml(app.getDefinition());
190                ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
191                Element eSla = XmlUtils.getSLAElement(wfElem);
192                String jobSlaXml = null;
193                if (eSla != null) {
194                    jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla);
195                }
196                writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(),
197                        conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName,
198                        evalSla);
199            }
200            wfBean.setAppName(appName);
201            wfBean.setProtoActionConf(protoActionConf.toXmlString());
202        }
203        catch (WorkflowException ex) {
204            throw new CommandException(ex);
205        }
206        catch (IOException ex) {
207            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
208        }
209        catch (HadoopAccessorException ex) {
210            throw new CommandException(ex);
211        }
212        catch (URISyntaxException ex) {
213            throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
214        }
215        catch (Exception ex) {
216            throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex);
217        }
218
219        for (int i = 0; i < actions.size(); i++) {
220            // Skipping to delete the sub workflow when rerun failed node option has been provided. As same
221            // action will be used to rerun the job.
222            if (!nodesToSkip.contains(actions.get(i).getName()) &&
223                    !(conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) &&
224                    SubWorkflowActionExecutor.ACTION_TYPE.equals(actions.get(i).getType()))) {
225                deleteList.add(actions.get(i));
226                LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
227            }
228            else {
229                copyActionData(newWfInstance, oldWfInstance);
230            }
231        }
232
233        wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
234        wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
235        wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
236        wfBean.setUser(conf.get(OozieClient.USER_NAME));
237        String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
238        wfBean.setGroup(group);
239        wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
240        wfBean.setEndTime(null);
241        wfBean.setRun(wfBean.getRun() + 1);
242        wfBean.setStatus(WorkflowJob.Status.PREP);
243        wfBean.setWorkflowInstance(newWfInstance);
244
245        try {
246            wfBean.setLastModifiedTime(new Date());
247            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_RERUN, wfBean));
248            // call JPAExecutor to do the bulk writes
249            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
250        }
251        catch (JPAExecutorException je) {
252            throw new CommandException(je);
253        }
254        finally {
255            updateParentIfNecessary(wfBean);
256        }
257
258    }
259
260    @SuppressWarnings("unchecked")
261        private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user,
262            String appName, ELEvaluator evalSla) throws JDOMException, CommandException {
263        if (jobSlaXml != null && jobSlaXml.length() > 0) {
264            Element eSla = XmlUtils.parseXml(jobSlaXml);
265            // insert into new table
266            SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG,
267                    true);
268        }
269        // Add sla for wf actions
270        for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) {
271            Element actionSla = XmlUtils.getSLAElement(action);
272            if (actionSla != null) {
273                String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
274                actionSla = XmlUtils.parseXml(actionSlaXml);
275                if (!nodesToSkip.contains(action.getAttributeValue("name"))) {
276                    String actionId = Services.get().get(UUIDService.class)
277                            .generateChildId(jobId, action.getAttributeValue("name") + "");
278                    SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user,
279                            appName, LOG, true);
280                }
281            }
282        }
283
284    }
285
286    /**
287     * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
288     * skipped node list
289     *
290     * @throws CommandException
291     */
292    @Override
293    protected void eagerLoadState() throws CommandException {
294        try {
295            this.wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, this.jobId);
296            this.actions = WorkflowActionQueryExecutor.getInstance().getList(
297                    WorkflowActionQuery.GET_ACTIONS_FOR_WORKFLOW_RERUN, this.jobId);
298
299            if (conf != null) {
300                if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
301                    Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
302                    for (String str : skipNodes) {
303                        // trimming is required
304                        nodesToSkip.add(str.trim());
305                    }
306                    LOG.debug("Skipnode size :" + nodesToSkip.size());
307                }
308                else {
309                    for (WorkflowActionBean action : actions) { // Rerun from failed nodes
310                        if (action.getStatus() == WorkflowAction.Status.OK) {
311                            nodesToSkip.add(action.getName());
312                        }
313                    }
314                    LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
315                }
316                StringBuilder tmp = new StringBuilder();
317                for (String node : nodesToSkip) {
318                    tmp.append(node).append(",");
319                }
320                LOG.debug("SkipNode List :" + tmp);
321            }
322        }
323        catch (Exception ex) {
324            throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
325        }
326    }
327
328    /**
329     * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
330     * The nodes that are to be skipped are to be completed successfully in the base run.
331     *
332     * @throws CommandException
333     * @throws PreconditionException On failure of pre-conditions
334     */
335    @Override
336    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
337        // Throwing error if parent exist and same workflow trying to rerun, when running child workflow disabled
338        // through conf.
339        if (wfBean.getParentId() != null && !conf.getBoolean(SubWorkflowActionExecutor.SUBWORKFLOW_RERUN, false)
340                && ConfigurationService.getBoolean(DISABLE_CHILD_RERUN)) {
341            throw new CommandException(ErrorCode.E0755, " Rerun is not allowed through child workflow, please" +
342                    " re-run through the parent " + wfBean.getParentId());
343        }
344
345        if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
346                || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
347                        WorkflowJob.Status.SUCCEEDED))) {
348            throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
349        }
350        Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
351        for (WorkflowActionBean action : actions) {
352            if (nodesToSkip.contains(action.getName())) {
353                if (!action.getStatus().equals(WorkflowAction.Status.OK)
354                        && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
355                    throw new CommandException(ErrorCode.E0806, action.getName());
356                }
357                unmachedNodes.remove(action.getName());
358            }
359        }
360        if (unmachedNodes.size() > 0) {
361            StringBuilder sb = new StringBuilder();
362            String separator = "";
363            for (String s : unmachedNodes) {
364                sb.append(separator).append(s);
365                separator = ",";
366            }
367            throw new CommandException(ErrorCode.E0807, sb);
368        }
369    }
370
371    /**
372     * Copys the variables for skipped nodes from the old wfInstance to new one.
373     *
374     * @param newWfInstance : Source WF instance object
375     * @param oldWfInstance : Update WF instance
376     */
377    private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
378        Map<String, String> oldVars = new HashMap<String, String>();
379        Map<String, String> newVars = new HashMap<String, String>();
380        oldVars = oldWfInstance.getAllVars();
381        for (String var : oldVars.keySet()) {
382            String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
383            if (nodesToSkip.contains(actionName)) {
384                newVars.put(var, oldVars.get(var));
385            }
386        }
387        for (String node : nodesToSkip) {
388            // Setting the TO_SKIP variable to true. This will be used by
389            // SignalCommand and LiteNodeHandler to skip the action.
390            newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
391            String visitedFlag = NodeHandler.getLoopFlag(node);
392            // Removing the visited flag so that the action won't be considered
393            // a loop.
394            if (newVars.containsKey(visitedFlag)) {
395                newVars.remove(visitedFlag);
396            }
397        }
398        newWfInstance.setAllVars(newVars);
399    }
400
401    /* (non-Javadoc)
402     * @see org.apache.oozie.command.XCommand#getEntityKey()
403     */
404    @Override
405    public String getEntityKey() {
406        return this.jobId;
407    }
408
409    /* (non-Javadoc)
410     * @see org.apache.oozie.command.XCommand#isLockRequired()
411     */
412    @Override
413    protected boolean isLockRequired() {
414        return true;
415    }
416
417    /* (non-Javadoc)
418     * @see org.apache.oozie.command.XCommand#loadState()
419     */
420    @Override
421    protected void loadState() throws CommandException {
422        try {
423            this.wfBean = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_RERUN, this.jobId);
424            this.actions = WorkflowActionQueryExecutor.getInstance().getList(
425                    WorkflowActionQuery.GET_ACTIONS_FOR_WORKFLOW_RERUN, this.jobId);
426        }
427        catch (JPAExecutorException jpe) {
428            throw new CommandException(jpe);
429        }
430    }
431
432    /* (non-Javadoc)
433     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
434     */
435    @Override
436    protected void verifyPrecondition() throws CommandException, PreconditionException {
437        eagerVerifyPrecondition();
438    }
439}