001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.HashSet;
021    import java.util.Set;
022    import java.io.IOException;
023    import java.util.Collection;
024    import java.util.HashMap;
025    import java.util.List;
026    import java.util.Map;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.oozie.client.WorkflowAction;
032    import org.apache.oozie.client.WorkflowJob;
033    import org.apache.oozie.client.OozieClient;
034    import org.apache.oozie.WorkflowActionBean;
035    import org.apache.oozie.WorkflowJobBean;
036    import org.apache.oozie.ErrorCode;
037    import org.apache.oozie.service.HadoopAccessorException;
038    import org.apache.oozie.service.WorkflowAppService;
039    import org.apache.oozie.service.Services;
040    import org.apache.oozie.service.DagXLogInfoService;
041    import org.apache.oozie.service.WorkflowStoreService;
042    import org.apache.oozie.service.HadoopAccessorService;
043    import org.apache.oozie.util.ParamChecker;
044    import org.apache.oozie.util.PropertiesUtils;
045    import org.apache.oozie.util.XLog;
046    import org.apache.oozie.util.XConfiguration;
047    import org.apache.oozie.util.XmlUtils;
048    import org.apache.oozie.command.Command;
049    import org.apache.oozie.command.CommandException;
050    import org.apache.oozie.store.StoreException;
051    import org.apache.oozie.store.WorkflowStore;
052    import org.apache.oozie.workflow.WorkflowApp;
053    import org.apache.oozie.workflow.WorkflowException;
054    import org.apache.oozie.workflow.WorkflowInstance;
055    import org.apache.oozie.workflow.WorkflowLib;
056    import org.apache.oozie.workflow.lite.NodeHandler;
057    
058    public class ReRunCommand extends WorkflowCommand<Void> {
059    
060        private String jobId;
061        private Configuration conf;
062        private String authToken;
063        private Set<String> nodesToSkip = new HashSet<String>();
064        public static final String TO_SKIP = "TO_SKIP";
065    
066        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
067        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
068    
069        static {
070            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
071                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
072                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
073                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
074            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
075    
076            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
077                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
078            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
079            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
080        }
081    
082        public ReRunCommand(String jobId, Configuration conf, String authToken) {
083            super("rerun", "rerun", 1, XLog.STD);
084            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
085            this.conf = ParamChecker.notNull(conf, "conf");
086            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
087        }
088    
089        /**
090         * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
091         * The nodes that are to be skipped are to be completed successfully in the base run.
092         *
093         * @param wfBean Workflow bean
094         * @param actions List of actions of Workflow
095         * @throws org.apache.oozie.command.CommandException On failure of pre-conditions
096         */
097        private void checkPreConditions(WorkflowJobBean wfBean, List<WorkflowActionBean> actions) throws CommandException {
098            if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
099                    || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
100                    WorkflowJob.Status.SUCCEEDED))) {
101                throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
102            }
103            Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
104            for (WorkflowActionBean action : actions) {
105                if (nodesToSkip.contains(action.getName())) {
106                    if (!action.getStatus().equals(WorkflowAction.Status.OK)
107                            && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
108                        throw new CommandException(ErrorCode.E0806, action.getName());
109                    }
110                    unmachedNodes.remove(action.getName());
111                }
112            }
113            if (unmachedNodes.size() > 0) {
114                StringBuilder sb = new StringBuilder();
115                String separator = "";
116                for (String s : unmachedNodes) {
117                    sb.append(separator).append(s);
118                    separator = ",";
119                }
120                throw new CommandException(ErrorCode.E0807, sb);
121            }
122        }
123    
124        /**
125         * Parses the config and adds the nodes that are to be skipped to the skipped node list
126         */
127        private void parseSkippedNodeConf() {
128            if (conf != null) {
129                Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
130                for (String str : skipNodes) {
131                    // trimming is required
132                    nodesToSkip.add(str.trim());
133                }
134            }
135        }
136    
137        protected Void call(WorkflowStore store) throws StoreException, CommandException {
138            incrJobCounter(1);
139            WorkflowJobBean wfBean = store.getWorkflow(jobId, false);
140            setLogInfo(wfBean);
141            List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId, false);
142            WorkflowInstance oldWfInstance = wfBean.getWorkflowInstance();
143            WorkflowInstance newWfInstance;
144            XLog log = XLog.getLog(getClass());
145            parseSkippedNodeConf();
146            checkPreConditions(wfBean, actions);
147    
148            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
149            try {
150                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
151                WorkflowApp app = wps.parseDef(conf, authToken);
152                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
153                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
154    
155                Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(), SubmitCommand.CONFIG_DEFAULT);
156                FileSystem fs = Services.get().get(HadoopAccessorService.class).
157                        createFileSystem(wfBean.getUser(), wfBean.getGroup(), configDefault.toUri(), protoActionConf);
158    
159                if (fs.exists(configDefault)) {
160                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
161                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
162                    XConfiguration.injectDefaults(defaultConf, conf);
163                }
164    
165                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
166    
167                try {
168                    newWfInstance = workflowLib.createInstance(app, conf, jobId);
169                }
170                catch (WorkflowException e) {
171                    throw new StoreException(e);
172                }
173                wfBean.setAppName(app.getName());
174                wfBean.setProtoActionConf(protoActionConf.toXmlString());
175            }
176            catch (WorkflowException ex) {
177                throw new CommandException(ex);
178            }
179            catch (IOException ex) {
180                throw new CommandException(ErrorCode.E0803, ex);
181            }
182            catch (HadoopAccessorException e) {
183                throw new CommandException(e);
184            }
185    
186            for (int i = 0; i < actions.size(); i++) {
187                if (!nodesToSkip.contains(actions.get(i).getName())) {
188                    store.deleteAction(actions.get(i).getId());
189                    log.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
190                }
191                else {
192                    copyActionData(newWfInstance, oldWfInstance);
193                }
194            }
195    
196            wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
197            wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
198            wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
199            wfBean.setUser(conf.get(OozieClient.USER_NAME));
200            wfBean.setGroup(conf.get(OozieClient.GROUP_NAME));
201            wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
202            wfBean.setEndTime(null);
203            wfBean.setRun(wfBean.getRun() + 1);
204            wfBean.setStatus(WorkflowJob.Status.PREP);
205            wfBean.setWorkflowInstance(newWfInstance);
206            store.updateWorkflow(wfBean);
207            return null;
208        }
209    
210        /**
211         * Copys the variables for skipped nodes from the old wfInstance to new one.
212         *
213         * @param newWfInstance
214         * @param oldWfInstance
215         */
216        private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
217            Map<String, String> oldVars = new HashMap<String, String>();
218            Map<String, String> newVars = new HashMap<String, String>();
219            oldVars = oldWfInstance.getAllVars();
220            for (String var : oldVars.keySet()) {
221                String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
222                if (nodesToSkip.contains(actionName)) {
223                    newVars.put(var, oldVars.get(var));
224                }
225            }
226            for (String node : nodesToSkip) {
227                // Setting the TO_SKIP variable to true. This will be used by
228                // SignalCommand and LiteNodeHandler to skip the action.
229                newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
230                String visitedFlag = NodeHandler.getLoopFlag(node);
231                // Removing the visited flag so that the action won't be considered
232                // a loop.
233                if (newVars.containsKey(visitedFlag)) {
234                    newVars.remove(visitedFlag);
235                }
236            }
237            newWfInstance.setAllVars(newVars);
238        }
239    
240        @Override
241        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
242            try {
243                XLog.getLog(getClass()).debug("STARTED ReRunCommand for job " + jobId);
244                if (lock(jobId)) {
245                    call(store);
246                }
247                else {
248                    queueCallable(new ReRunCommand(jobId, conf, authToken), LOCK_FAILURE_REQUEUE_INTERVAL);
249                    XLog.getLog(getClass()).warn("ReRunCommand lock was not acquired - failed {0}", jobId);
250                }
251            }
252            catch (InterruptedException e) {
253                queueCallable(new ReRunCommand(jobId, conf, authToken), LOCK_FAILURE_REQUEUE_INTERVAL);
254                XLog.getLog(getClass())
255                        .warn("ReRunCommand lock was not acquired - interrupted exception failed {0}", jobId);
256            }
257            XLog.getLog(getClass()).debug("ENDED ReRunCommand for job " + jobId);
258            return null;
259        }
260    }