001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.util.ArrayList;
024    import java.util.Collection;
025    import java.util.Date;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.oozie.ErrorCode;
036    import org.apache.oozie.WorkflowActionBean;
037    import org.apache.oozie.WorkflowJobBean;
038    import org.apache.oozie.client.OozieClient;
039    import org.apache.oozie.client.WorkflowAction;
040    import org.apache.oozie.client.WorkflowJob;
041    import org.apache.oozie.client.rest.JsonBean;
042    import org.apache.oozie.command.CommandException;
043    import org.apache.oozie.command.PreconditionException;
044    import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor;
045    import org.apache.oozie.executor.jpa.JPAExecutorException;
046    import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
047    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
048    import org.apache.oozie.service.DagXLogInfoService;
049    import org.apache.oozie.service.HadoopAccessorException;
050    import org.apache.oozie.service.HadoopAccessorService;
051    import org.apache.oozie.service.JPAService;
052    import org.apache.oozie.service.Services;
053    import org.apache.oozie.service.WorkflowAppService;
054    import org.apache.oozie.service.WorkflowStoreService;
055    import org.apache.oozie.util.ConfigUtils;
056    import org.apache.oozie.util.InstrumentUtils;
057    import org.apache.oozie.util.LogUtils;
058    import org.apache.oozie.util.ParamChecker;
059    import org.apache.oozie.util.PropertiesUtils;
060    import org.apache.oozie.util.XConfiguration;
061    import org.apache.oozie.util.XLog;
062    import org.apache.oozie.util.XmlUtils;
063    import org.apache.oozie.workflow.WorkflowApp;
064    import org.apache.oozie.workflow.WorkflowException;
065    import org.apache.oozie.workflow.WorkflowInstance;
066    import org.apache.oozie.workflow.WorkflowLib;
067    import org.apache.oozie.workflow.lite.NodeHandler;
068    
069    /**
070     * This is a RerunXCommand which is used for rerunn.
071     *
072     */
073    public class ReRunXCommand extends WorkflowXCommand<Void> {
074        private final String jobId;
075        private Configuration conf;
076        private final String authToken;
077        private final Set<String> nodesToSkip = new HashSet<String>();
078        public static final String TO_SKIP = "TO_SKIP";
079        private WorkflowJobBean wfBean;
080        private List<WorkflowActionBean> actions;
081        private JPAService jpaService;
082        private List<JsonBean> updateList = new ArrayList<JsonBean>();
083        private List<JsonBean> deleteList = new ArrayList<JsonBean>();
084    
085        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
086        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
087    
088        static {
089            String[] badUserProps = { PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
090                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
091                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
092                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
093            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
094    
095            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
096            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098        }
099    
100        public ReRunXCommand(String jobId, Configuration conf, String authToken) {
101            super("rerun", "rerun", 1);
102            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
103            this.conf = ParamChecker.notNull(conf, "conf");
104            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
105        }
106    
107        /* (non-Javadoc)
108         * @see org.apache.oozie.command.XCommand#execute()
109         */
110        @Override
111        protected Void execute() throws CommandException {
112            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
113            LogUtils.setLogInfo(wfBean, logInfo);
114            WorkflowInstance oldWfInstance = this.wfBean.getWorkflowInstance();
115            WorkflowInstance newWfInstance;
116            String appPath = null;
117    
118            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
119            try {
120                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
121                WorkflowApp app = wps.parseDef(conf, authToken);
122                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
123                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
124    
125                appPath = conf.get(OozieClient.APP_PATH);
126                URI uri = new URI(appPath);
127                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
128                Configuration fsConf = has.createJobConf(uri.getAuthority());
129                FileSystem fs = has.createFileSystem(wfBean.getUser(), uri, fsConf);
130    
131                Path configDefault = null;
132                // app path could be a directory
133                Path path = new Path(uri.getPath());
134                if (!fs.isFile(path)) {
135                    configDefault = new Path(path, SubmitXCommand.CONFIG_DEFAULT);
136                } else {
137                    configDefault = new Path(path.getParent(), SubmitXCommand.CONFIG_DEFAULT);
138                }
139    
140                if (fs.exists(configDefault)) {
141                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
142                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
143                    XConfiguration.injectDefaults(defaultConf, conf);
144                }
145    
146                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
147    
148                // Resolving all variables in the job properties. This ensures the Hadoop Configuration semantics are preserved.
149                // The Configuration.get function within XConfiguration.resolve() works recursively to get the final value corresponding to a key in the map
150                // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream
151                conf = ((XConfiguration) conf).resolve();
152    
153                try {
154                    newWfInstance = workflowLib.createInstance(app, conf, jobId);
155                }
156                catch (WorkflowException e) {
157                    throw new CommandException(e);
158                }
159                wfBean.setAppName(app.getName());
160                wfBean.setProtoActionConf(protoActionConf.toXmlString());
161            }
162            catch (WorkflowException ex) {
163                throw new CommandException(ex);
164            }
165            catch (IOException ex) {
166                throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
167            }
168            catch (HadoopAccessorException ex) {
169                throw new CommandException(ex);
170            }
171            catch (URISyntaxException ex) {
172                throw new CommandException(ErrorCode.E0711, appPath, ex.getMessage(), ex);
173            }
174    
175            for (int i = 0; i < actions.size(); i++) {
176                if (!nodesToSkip.contains(actions.get(i).getName())) {
177                    deleteList.add(actions.get(i));
178                    LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId());
179                }
180                else {
181                    copyActionData(newWfInstance, oldWfInstance);
182                }
183            }
184    
185            wfBean.setAppPath(conf.get(OozieClient.APP_PATH));
186            wfBean.setConf(XmlUtils.prettyPrint(conf).toString());
187            wfBean.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
188            wfBean.setUser(conf.get(OozieClient.USER_NAME));
189            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
190            wfBean.setGroup(group);
191            wfBean.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
192            wfBean.setEndTime(null);
193            wfBean.setRun(wfBean.getRun() + 1);
194            wfBean.setStatus(WorkflowJob.Status.PREP);
195            wfBean.setWorkflowInstance(newWfInstance);
196    
197            try {
198                wfBean.setLastModifiedTime(new Date());
199                updateList.add(wfBean);
200                // call JPAExecutor to do the bulk writes
201                jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, true));
202            }
203            catch (JPAExecutorException je) {
204                throw new CommandException(je);
205            }
206    
207            return null;
208        }
209    
210        /**
211         * Loading the Wfjob and workflow actions. Parses the config and adds the nodes that are to be skipped to the
212         * skipped node list
213         *
214         * @throws CommandException
215         */
216        @Override
217        protected void eagerLoadState() throws CommandException {
218            super.eagerLoadState();
219            try {
220                jpaService = Services.get().get(JPAService.class);
221                if (jpaService != null) {
222                    this.wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(this.jobId));
223                    this.actions = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(this.jobId));
224                }
225                else {
226                    throw new CommandException(ErrorCode.E0610);
227                }
228    
229                if (conf != null) {
230                    if (conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) == false) { //Rerun with skipNodes
231                        Collection<String> skipNodes = conf.getStringCollection(OozieClient.RERUN_SKIP_NODES);
232                        for (String str : skipNodes) {
233                            // trimming is required
234                            nodesToSkip.add(str.trim());
235                        }
236                        LOG.debug("Skipnode size :" + nodesToSkip.size());
237                    }
238                    else {
239                        for (WorkflowActionBean action : actions) { // Rerun from failed nodes
240                            if (action.getStatus() == WorkflowAction.Status.OK) {
241                                nodesToSkip.add(action.getName());
242                            }
243                        }
244                        LOG.debug("Skipnode size are to rerun from FAIL nodes :" + nodesToSkip.size());
245                    }
246                    StringBuilder tmp = new StringBuilder();
247                    for (String node : nodesToSkip) {
248                        tmp.append(node).append(",");
249                    }
250                    LOG.debug("SkipNode List :" + tmp);
251                }
252            }
253            catch (Exception ex) {
254                throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex);
255            }
256        }
257    
258        /**
259         * Checks the pre-conditions that are required for workflow to recover - Last run of Workflow should be completed -
260         * The nodes that are to be skipped are to be completed successfully in the base run.
261         *
262         * @throws org.apache.oozie.command.CommandException,PreconditionException On failure of pre-conditions
263         */
264        @Override
265        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
266            super.eagerVerifyPrecondition();
267            if (!(wfBean.getStatus().equals(WorkflowJob.Status.FAILED)
268                    || wfBean.getStatus().equals(WorkflowJob.Status.KILLED) || wfBean.getStatus().equals(
269                            WorkflowJob.Status.SUCCEEDED))) {
270                throw new CommandException(ErrorCode.E0805, wfBean.getStatus());
271            }
272            Set<String> unmachedNodes = new HashSet<String>(nodesToSkip);
273            for (WorkflowActionBean action : actions) {
274                if (nodesToSkip.contains(action.getName())) {
275                    if (!action.getStatus().equals(WorkflowAction.Status.OK)
276                            && !action.getStatus().equals(WorkflowAction.Status.ERROR)) {
277                        throw new CommandException(ErrorCode.E0806, action.getName());
278                    }
279                    unmachedNodes.remove(action.getName());
280                }
281            }
282            if (unmachedNodes.size() > 0) {
283                StringBuilder sb = new StringBuilder();
284                String separator = "";
285                for (String s : unmachedNodes) {
286                    sb.append(separator).append(s);
287                    separator = ",";
288                }
289                throw new CommandException(ErrorCode.E0807, sb);
290            }
291        }
292    
293        /**
294         * Copys the variables for skipped nodes from the old wfInstance to new one.
295         *
296         * @param newWfInstance : Source WF instance object
297         * @param oldWfInstance : Update WF instance
298         */
299        private void copyActionData(WorkflowInstance newWfInstance, WorkflowInstance oldWfInstance) {
300            Map<String, String> oldVars = new HashMap<String, String>();
301            Map<String, String> newVars = new HashMap<String, String>();
302            oldVars = oldWfInstance.getAllVars();
303            for (String var : oldVars.keySet()) {
304                String actionName = var.split(WorkflowInstance.NODE_VAR_SEPARATOR)[0];
305                if (nodesToSkip.contains(actionName)) {
306                    newVars.put(var, oldVars.get(var));
307                }
308            }
309            for (String node : nodesToSkip) {
310                // Setting the TO_SKIP variable to true. This will be used by
311                // SignalCommand and LiteNodeHandler to skip the action.
312                newVars.put(node + WorkflowInstance.NODE_VAR_SEPARATOR + TO_SKIP, "true");
313                String visitedFlag = NodeHandler.getLoopFlag(node);
314                // Removing the visited flag so that the action won't be considered
315                // a loop.
316                if (newVars.containsKey(visitedFlag)) {
317                    newVars.remove(visitedFlag);
318                }
319            }
320            newWfInstance.setAllVars(newVars);
321        }
322    
323        /* (non-Javadoc)
324         * @see org.apache.oozie.command.XCommand#getEntityKey()
325         */
326        @Override
327        public String getEntityKey() {
328            return this.jobId;
329        }
330    
331        /* (non-Javadoc)
332         * @see org.apache.oozie.command.XCommand#isLockRequired()
333         */
334        @Override
335        protected boolean isLockRequired() {
336            return true;
337        }
338    
339        /* (non-Javadoc)
340         * @see org.apache.oozie.command.XCommand#loadState()
341         */
342        @Override
343        protected void loadState() throws CommandException {
344        }
345    
346        /* (non-Javadoc)
347         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
348         */
349        @Override
350        protected void verifyPrecondition() throws CommandException, PreconditionException {
351        }
352    }