001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.util.ArrayList;
024    import java.util.Collection;
025    import java.util.Date;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.oozie.AppType;
036    import org.apache.oozie.ErrorCode;
037    import org.apache.oozie.WorkflowActionBean;
038    import org.apache.oozie.WorkflowJobBean;
039    import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
040    import org.apache.oozie.client.OozieClient;
041    import org.apache.oozie.client.WorkflowAction;
042    import org.apache.oozie.client.WorkflowJob;
043    import org.apache.oozie.client.rest.JsonBean;
044    import org.apache.oozie.command.CommandException;
045    import org.apache.oozie.command.PreconditionException;
046    import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
047    import org.apache.oozie.executor.jpa.JPAExecutorException;
048    import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
049    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
050    import org.apache.oozie.service.DagXLogInfoService;
051    import org.apache.oozie.service.HadoopAccessorException;
052    import org.apache.oozie.service.HadoopAccessorService;
053    import org.apache.oozie.service.JPAService;
054    import org.apache.oozie.service.Services;
055    import org.apache.oozie.service.UUIDService;
056    import org.apache.oozie.service.WorkflowAppService;
057    import org.apache.oozie.service.WorkflowStoreService;
058    import org.apache.oozie.sla.SLAOperations;
059    import org.apache.oozie.sla.service.SLAService;
060    import org.apache.oozie.util.ConfigUtils;
061    import org.apache.oozie.util.ELEvaluator;
062    import org.apache.oozie.util.ELUtils;
063    import org.apache.oozie.util.InstrumentUtils;
064    import org.apache.oozie.util.LogUtils;
065    import org.apache.oozie.util.ParamChecker;
066    import org.apache.oozie.util.PropertiesUtils;
067    import org.apache.oozie.util.XConfiguration;
068    import org.apache.oozie.util.XLog;
069    import org.apache.oozie.util.XmlUtils;
070    import org.apache.oozie.workflow.WorkflowApp;
071    import org.apache.oozie.workflow.WorkflowException;
072    import org.apache.oozie.workflow.WorkflowInstance;
073    import org.apache.oozie.workflow.WorkflowLib;
074    import org.apache.oozie.workflow.lite.NodeHandler;
075    import org.jdom.Element;
076    import org.jdom.JDOMException;
077    
078    /**
079     * This is a RerunXCommand which is used for rerunn.
080     *
081     */
082    public class ReRunXCommand extends WorkflowXCommand<Void> {
083        private final String jobId;
084        private Configuration conf;
085        private final Set<String> nodesToSkip = new HashSet<String>();
086        public static final String TO_SKIP = "TO_SKIP";
087        private WorkflowJobBean wfBean;
088        private List<WorkflowActionBean> actions;
089        private JPAService jpaService;
090        private List<JsonBean> updateList = new ArrayList<JsonBean>();
091        private List<JsonBean> deleteList = new ArrayList<JsonBean>();
092    
093        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
094        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
095    
096        static {
097            String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
098                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
099                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
100                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
101            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
102    
103            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
104            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
105            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
106        }
107    
108        public ReRunXCommand(String jobId, Configuration conf) {
109            super("rerun", "rerun", 1);
110            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
111            this.conf = ParamChecker.notNull(conf, "conf");
112        }
113    
114        /* (non-Javadoc)
115         * @see org.apache.oozie.command.XCommand#execute()
116         */
117        @Override
118        protected Void execute() throws CommandException {
119            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
120            LogUtils.setLogInfo(wfBean, logInfo);
121            WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
122            WorkflowInstance newWfInstance;
123            String appPath = null;
124    
125            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
126            try {
127                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
128                WorkflowApp app = wps.parseDef(conf);
129                XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
130                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
131    
132                appPath = conf.get(OozieClient.APP_PATH);
133                URI uri = new URI(appPath);
134                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
135                Configuration fsConf = has.createJobConf(uri.getAuthority());
136                FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf);
137    
138                Path configDefault = null;
139                // app path could be a directory
140                Path path = new Path(uri.getPath());
141                if (!fs.isFile(path)) {
142                    configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT);
143                } else {
144                    configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT);
145                }
146    
147                if (fs.exists(configDefault)) {
148                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
149                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
150                    XConfiguration.injectDefaults(defaultConf, conf);
151                }
152    
153                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
154    
155                // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved.
156                // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map
157                // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream
158                conf = ((XConfiguration) conf).resolve();
159    
160                try {
161                    newWfInstance = workflowLib.createInstance(app, conf, jobId);
162                }
163                catch (WorkflowException e) {
164                    throw new CommandException(e);
165                }
166    
167                if (SLAService.isEnabled()) {
168                    Element wfElem = XmlUtils.parseXml(app.getDefinition());
169                    ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
170                    Element eSla = XmlUtils.getSLAElement(wfElem);
171                    String jobSlaXml = null;
172                    if (eSla != null) {
173                        jobSlaXml = SubmitXCommand.resolveSla(eSla, evalSla);
174                    }
175                    String appName = ELUtils.resolveAppName(app.getName(), conf);
176                    writeSLARegistration(wfElem, jobSlaXml, newWfInstance.getId(),
177                                conf.get(SubWorkflowActionExecutor.PARENT_ID), conf.get(OozieClient.USER_NAME), appName,
178                                evalSla);
179                }
180                wfBean.setAppName(app.getName());
181                wfBean.setProtoActionConf(protoActionConf.toXmlString());
182            }
183            catch (WorkflowException ex) {
184                throw new CommandException(ex);
185            }
186            catch (IOException ex) {
187                throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
188            }
189            catch (HadoopAccessorException ex) {
190                throw new CommandException(ex);
191            }
192            catch (URISyntaxException ex) {
193                throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
194            }
195            catch (Exception ex) {
196                throw new CommandException(ErrorCode.E1007, ex.getMessage(), ex);
197            }
198    
199            for (int i = 0; i < actions.size(); i++) {
200                if (!nodesToSkip.contains(actions.get(i).getName())) {
201                    deleteList.add(actions.get(i));
202                    LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
203                }
204                else {
205                    copyActionData(newWfInstance, oldWfInstance);
206                }
207            }
208    
209            wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
210            wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
211            wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
212            wfBean.setUser(conf.get(OozieClient.USER_NAME));
213            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
214            wfBean.setGroup(group);
215            wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
216            wfBean.setEndTime(null);
217            wfBean.setRun(wfBean.getRun() + 1);
218            wfBean.setStatus(WorkflowJob.Status.PREP);
219            wfBean.setWorkflowInstance(newWfInstance);
220    
221            try {
222                wfBean.setLastModifiedTime(new Date());
223                updateList.add(wfBean);
224                // call JPAExecutor to do the bulk writes
225                jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
226            }
227            catch (JPAExecutorException je) {
228                throw new CommandException(je);
229            }
230    
231            return null;
232        }
233    
234    
235        @SuppressWarnings("unchecked")
236            private void writeSLARegistration(Element wfElem, String jobSlaXml, String id, String parentId, String user,
237                String appName, ELEvaluator evalSla) throws JDOMException, CommandException {
238            if (jobSlaXml != null && jobSlaXml.length() > 0) {
239                Element eSla = XmlUtils.parseXml(jobSlaXml);
240                // insert into new table
241                SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName, LOG,
242                        true);
243            }
244            // Add sla for wf actions
245            for (Element action : (List<Element>) wfElem.getChildren("action", wfElem.getNamespace())) {
246                Element actionSla = XmlUtils.getSLAElement(action);
247                if (actionSla != null) {
248                    String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
249                    actionSla = XmlUtils.parseXml(actionSlaXml);
250                    if (!nodesToSkip.contains(action.getAttributeValue("name"))) {
251                        String actionId = Services.get().get(UUIDService.class)
252                                .generateChildId(jobId, action.getAttributeValue("name") + "");
253                        SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION, user,
254                                appName, LOG, true);
255                    }
256                }
257            }
258    
259        }
260    
261        /**
262         * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
263         * skipped node list
264         *
265         * @throws CommandException
266         */
267        @Override
268        protected void eagerLoadState() throws CommandException {
269            super.eagerLoadState();
270            try {
271                jpaService = Services.get().get(JPAService.class);
272                if (jpaService != null) {
273                    this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId));
274                    this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId));
275                }
276                else {
277                    throw new CommandException(ErrorCode.E0610);
278                }
279    
280                if (conf != null) {
281                    if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
282                        Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
283                        for (String str : skipNodes) {
284                            // trimming is required
285                            nodesToSkip.add(str.trim());
286                        }
287                        LOG.debug("Skipnode size :" + nodesToSkip.size());
288                    }
289                    else {
290                        for (WorkflowActionBean action : actions) { // Rerun from failed nodes
291                            if (action.getStatus() == WorkflowAction.Status.OK) {
292                                nodesToSkip.add(action.getName());
293                            }
294                        }
295                        LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
296                    }
297                    StringBuilder tmp = new StringBuilder();
298                    for (String node : nodesToSkip) {
299                        tmp.append(node).append(",");
300                    }
301                    LOG.debug("SkipNode List :" + tmp);
302                }
303            }
304            catch (Exception ex) {
305                throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
306            }
307        }
308    
309        /**
310         * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
311         * The nodes that are to be skipped are to be completed successfully in the base run.
312         *
313         * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions
314         */
315        @Override
316        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
317            super.eagerVerifyPrecondition();
318            if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
319                    || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
320                            WorkflowJob.Status.SUCCEEDED))) {
321                throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
322            }
323            Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
324            for (WorkflowActionBean action : actions) {
325                if (nodesToSkip.contains(action.getName())) {
326                    if (!action.getStatus().equals(WorkflowAction.Status.OK)
327                            && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
328                        throw new CommandException(ErrorCode.E0806, action.getName());
329                    }
330                    unmachedNodes.remove(action.getName());
331                }
332            }
333            if (unmachedNodes.size() > 0) {
334                StringBuilder sb = new StringBuilder();
335                String separator = "";
336                for (String s : unmachedNodes) {
337                    sb.append(separator).append(s);
338                    separator = ",";
339                }
340                throw new CommandException(ErrorCode.E0807, sb);
341            }
342        }
343    
344        /**
345         * Copys the variables for skipped nodes from the old wfInstance to new one.
346         *
347         * @param newWfInstance : Source WF instance object
348         * @param oldWfInstance : Update WF instance
349         */
350        private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
351            Map<String, String> oldVars = new HashMap<String, String>();
352            Map<String, String> newVars = new HashMap<String, String>();
353            oldVars = oldWfInstance.getAllVars();
354            for (String var : oldVars.keySet()) {
355                String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
356                if (nodesToSkip.contains(actionName)) {
357                    newVars.put(var, oldVars.get(var));
358                }
359            }
360            for (String node : nodesToSkip) {
361                // Setting the TO_SKIP variable to true. This will be used by
362                // SignalCommand and LiteNodeHandler to skip the action.
363                newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
364                String visitedFlag = NodeHandler.getLoopFlag(node);
365                // Removing the visited flag so that the action won't be considered
366                // a loop.
367                if (newVars.containsKey(visitedFlag)) {
368                    newVars.remove(visitedFlag);
369                }
370            }
371            newWfInstance.setAllVars(newVars);
372        }
373    
374        /* (non-Javadoc)
375         * @see org.apache.oozie.command.XCommand#getEntityKey()
376         */
377        @Override
378        public String getEntityKey() {
379            return this.jobId;
380        }
381    
382        /* (non-Javadoc)
383         * @see org.apache.oozie.command.XCommand#isLockRequired()
384         */
385        @Override
386        protected boolean isLockRequired() {
387            return true;
388        }
389    
390        /* (non-Javadoc)
391         * @see org.apache.oozie.command.XCommand#loadState()
392         */
393        @Override
394        protected void loadState() throws CommandException {
395            eagerLoadState();
396        }
397    
398        /* (non-Javadoc)
399         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
400         */
401        @Override
402        protected void verifyPrecondition() throws CommandException, PreconditionException {
403            eagerVerifyPrecondition();
404        }
405    }