001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.util.Collection;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.WorkflowActionBean;
035    import org.apache.oozie.WorkflowJobBean;
036    import org.apache.oozie.client.OozieClient;
037    import org.apache.oozie.client.WorkflowAction;
038    import org.apache.oozie.client.WorkflowJob;
039    import org.apache.oozie.command.CommandException;
040    import org.apache.oozie.command.PreconditionException;
041    import org.apache.oozie.executor.jpa.JPAExecutorException;
042    import org.apache.oozie.executor.jpa.WorkflowActionDeleteJPAExecutor;
043    import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
044    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
045    import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
046    import org.apache.oozie.service.DagXLogInfoService;
047    import org.apache.oozie.service.HadoopAccessorException;
048    import org.apache.oozie.service.HadoopAccessorService;
049    import org.apache.oozie.service.JPAService;
050    import org.apache.oozie.service.Services;
051    import org.apache.oozie.service.WorkflowAppService;
052    import org.apache.oozie.service.WorkflowStoreService;
053    import org.apache.oozie.util.InstrumentUtils;
054    import org.apache.oozie.util.LogUtils;
055    import org.apache.oozie.util.ParamChecker;
056    import org.apache.oozie.util.PropertiesUtils;
057    import org.apache.oozie.util.XConfiguration;
058    import org.apache.oozie.util.XLog;
059    import org.apache.oozie.util.XmlUtils;
060    import org.apache.oozie.workflow.WorkflowApp;
061    import org.apache.oozie.workflow.WorkflowException;
062    import org.apache.oozie.workflow.WorkflowInstance;
063    import org.apache.oozie.workflow.WorkflowLib;
064    import org.apache.oozie.workflow.lite.NodeHandler;
065    
066    /**
067     * This is a RerunXCommand which is used for rerunn.
068     *
069     */
070    public class ReRunXCommand extends WorkflowXCommand<Void> {
071        private final String jobId;
072        private final Configuration conf;
073        private final String authToken;
074        private final Set<String> nodesToSkip = new HashSet<String>();
075        public static final String TO_SKIP = "TO_SKIP";
076        private WorkflowJobBean wfBean;
077        private List<WorkflowActionBean> actions;
078        private JPAService jpaService;
079    
080        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
081        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
082    
083        static {
084            String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
085                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
086                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
087                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
088            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
089    
090            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
091                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
092            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
093            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
094        }
095    
096        public ReRunXCommand(String jobId, Configuration conf, String authToken) {
097            super("rerun", "rerun", 1);
098            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
099            this.conf = ParamChecker.notNull(conf, "conf");
100            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
101        }
102    
103        /* (non-Javadoc)
104         * @see org.apache.oozie.command.XCommand#execute()
105         */
106        @Override
107        protected Void execute() throws CommandException {
108            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
109            LogUtils.setLogInfo(wfBean, logInfo);
110            WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
111            WorkflowInstance newWfInstance;
112    
113            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
114            try {
115                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
116                WorkflowApp app = wps.parseDef(conf, authToken);
117                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
118                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
119    
120                URI uri = new URI(conf.get(OozieClient.APP_PATH));
121                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(wfBean.getUser(),
122                        wfBean.getGroup(), uri, new Configuration());
123    
124                Path configDefault = null;
125                // app path could be a directory
126                Path path = new Path(uri.getPath());
127                if (!fs.isFile(path)) {
128                    configDefault = new Path(path, SubmitCommand.CONFIG_DEFAULT);
129                } else {
130                    configDefault = new Path(path.getParent(), SubmitCommand.CONFIG_DEFAULT);
131                }
132    
133                if (fs.exists(configDefault)) {
134                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
135                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
136                    XConfiguration.injectDefaults(defaultConf, conf);
137                }
138    
139                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
140    
141                try {
142                    newWfInstance = workflowLib.createInstance(app, conf, jobId);
143                }
144                catch (WorkflowException e) {
145                    throw new CommandException(e);
146                }
147                wfBean.setAppName(app.getName());
148                wfBean.setProtoActionConf(protoActionConf.toXmlString());
149            }
150            catch (WorkflowException ex) {
151                throw new CommandException(ex);
152            }
153            catch (IOException ex) {
154                throw new CommandException(ErrorCode.E0803, ex);
155            }
156            catch (HadoopAccessorException ex) {
157                throw new CommandException(ex);
158            }
159            catch (URISyntaxException ex) {
160                throw new CommandException(ErrorCode.E0711, ex.getMessage(), ex);
161            }
162    
163            try {
164                for (int i = 0; i < actions.size(); i++) {
165                    if (!nodesToSkip.contains(actions.get(i).getName())) {
166                        jpaService.execute(new WorkflowActionDeleteJPAExecutor(actions.get(i).getId()));
167                        LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
168                    }
169                    else {
170                        copyActionData(newWfInstance, oldWfInstance);
171                    }
172                }
173    
174                wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
175                wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
176                wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
177                wfBean.setUser(conf.get(OozieClient.USER_NAME));
178                wfBean.setGroup(conf.get(OozieClient.GROUP_NAME));
179                wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
180                wfBean.setEndTime(null);
181                wfBean.setRun(wfBean.getRun() + 1);
182                wfBean.setStatus(WorkflowJob.Status.PREP);
183                wfBean.setWorkflowInstance(newWfInstance);
184                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean));
185            }
186            catch (JPAExecutorException e) {
187                throw new CommandException(e);
188            }
189    
190            return null;
191        }
192    
193        /**
194         * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
195         * skipped node list
196         *
197         * @throws CommandException
198         */
199        @Override
200        protected void eagerLoadState() throws CommandException {
201            super.eagerLoadState();
202            try {
203                jpaService = Services.get().get(JPAService.class);
204                if (jpaService != null) {
205                    this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId));
206                    this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId));
207                }
208                else {
209                    throw new CommandException(ErrorCode.E0610);
210                }
211    
212                if (conf != null) {
213                    if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
214                        Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
215                        for (String str : skipNodes) {
216                            // trimming is required
217                            nodesToSkip.add(str.trim());
218                        }
219                        LOG.debug("Skipnode size :" + nodesToSkip.size());
220                    }
221                    else {
222                        for (WorkflowActionBean action : actions) { // Rerun from failed nodes
223                            if (action.getStatus() == WorkflowAction.Status.OK) {
224                                nodesToSkip.add(action.getName());
225                            }
226                        }
227                        LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
228                    }
229                    StringBuilder tmp = new StringBuilder();
230                    for (String node : nodesToSkip) {
231                        tmp.append(node).append(",");
232                    }
233                    LOG.debug("SkipNode List :" + tmp);
234                }
235            }
236            catch (Exception ex) {
237                throw new CommandException(ErrorCode.E0603, ex);
238            }
239        }
240    
241        /**
242         * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
243         * The nodes that are to be skipped are to be completed successfully in the base run.
244         *
245         * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions
246         */
247        @Override
248        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
249            super.eagerVerifyPrecondition();
250            if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
251                    || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
252                            WorkflowJob.Status.SUCCEEDED))) {
253                throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
254            }
255            Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
256            for (WorkflowActionBean action : actions) {
257                if (nodesToSkip.contains(action.getName())) {
258                    if (!action.getStatus().equals(WorkflowAction.Status.OK)
259                            && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
260                        throw new CommandException(ErrorCode.E0806, action.getName());
261                    }
262                    unmachedNodes.remove(action.getName());
263                }
264            }
265            if (unmachedNodes.size() > 0) {
266                StringBuilder sb = new StringBuilder();
267                String separator = "";
268                for (String s : unmachedNodes) {
269                    sb.append(separator).append(s);
270                    separator = ",";
271                }
272                throw new CommandException(ErrorCode.E0807, sb);
273            }
274        }
275    
276        /**
277         * Copys the variables for skipped nodes from the old wfInstance to new one.
278         *
279         * @param newWfInstance : Source WF instance object
280         * @param oldWfInstance : Update WF instance
281         */
282        private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
283            Map<String, String> oldVars = new HashMap<String, String>();
284            Map<String, String> newVars = new HashMap<String, String>();
285            oldVars = oldWfInstance.getAllVars();
286            for (String var : oldVars.keySet()) {
287                String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
288                if (nodesToSkip.contains(actionName)) {
289                    newVars.put(var, oldVars.get(var));
290                }
291            }
292            for (String node : nodesToSkip) {
293                // Setting the TO_SKIP variable to true. This will be used by
294                // SignalCommand and LiteNodeHandler to skip the action.
295                newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
296                String visitedFlag = NodeHandler.getLoopFlag(node);
297                // Removing the visited flag so that the action won't be considered
298                // a loop.
299                if (newVars.containsKey(visitedFlag)) {
300                    newVars.remove(visitedFlag);
301                }
302            }
303            newWfInstance.setAllVars(newVars);
304        }
305    
306        /* (non-Javadoc)
307         * @see org.apache.oozie.command.XCommand#getEntityKey()
308         */
309        @Override
310        protected String getEntityKey() {
311            return this.jobId;
312        }
313    
314        /* (non-Javadoc)
315         * @see org.apache.oozie.command.XCommand#isLockRequired()
316         */
317        @Override
318        protected boolean isLockRequired() {
319            return true;
320        }
321    
322        /* (non-Javadoc)
323         * @see org.apache.oozie.command.XCommand#loadState()
324         */
325        @Override
326        protected void loadState() throws CommandException {
327        }
328    
329        /* (non-Javadoc)
330         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
331         */
332        @Override
333        protected void verifyPrecondition() throws CommandException, PreconditionException {
334        }
335    }