001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.List;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.Path;
026    import org.apache.oozie.CoordinatorActionBean;
027    import org.apache.oozie.CoordinatorActionInfo;
028    import org.apache.oozie.CoordinatorJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.SLAEventBean;
031    import org.apache.oozie.XException;
032    import org.apache.oozie.action.ActionExecutorException;
033    import org.apache.oozie.action.hadoop.FsActionExecutor;
034    import org.apache.oozie.client.CoordinatorAction;
035    import org.apache.oozie.client.CoordinatorJob;
036    import org.apache.oozie.client.Job;
037    import org.apache.oozie.client.SLAEvent.SlaAppType;
038    import org.apache.oozie.client.rest.RestConstants;
039    import org.apache.oozie.command.CommandException;
040    import org.apache.oozie.command.PreconditionException;
041    import org.apache.oozie.command.RerunTransitionXCommand;
042    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
043    import org.apache.oozie.coord.CoordELFunctions;
044    import org.apache.oozie.coord.CoordUtils;
045    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.JPAExecutorException;
048    import org.apache.oozie.service.EventHandlerService;
049    import org.apache.oozie.service.JPAService;
050    import org.apache.oozie.service.Services;
051    import org.apache.oozie.sla.SLAOperations;
052    import org.apache.oozie.sla.service.SLAService;
053    import org.apache.oozie.util.InstrumentUtils;
054    import org.apache.oozie.util.LogUtils;
055    import org.apache.oozie.util.ParamChecker;
056    import org.apache.oozie.util.StatusUtils;
057    import org.apache.oozie.util.XConfiguration;
058    import org.apache.oozie.util.XLog;
059    import org.apache.oozie.util.XmlUtils;
060    import org.apache.oozie.util.db.SLADbOperations;
061    import org.jdom.Element;
062    import org.jdom.JDOMException;
063    
064    /**
065     * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
066     * <p/>
067     * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
068     * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
069     * <p/>
070     * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
071     * <p/>
072     * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
073     */
074    @SuppressWarnings("deprecation")
075    public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
076    
077        private String rerunType;
078        private String scope;
079        private boolean refresh;
080        private boolean noCleanup;
081        private CoordinatorJobBean coordJob = null;
082        private JPAService jpaService = null;
083        protected boolean prevPending;
084    
085        /**
086         * The constructor for class {@link CoordRerunXCommand}
087         *
088         * @param jobId the job id
089         * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
090         * @param scope the rerun scope for given rerunType separated by ","
091         * @param refresh true if user wants to refresh input/output dataset urls
092         * @param noCleanup false if user wants to cleanup output events for given rerun actions
093         */
094        public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
095            super("coord_rerun", "coord_rerun", 1);
096            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
097            this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
098            this.scope = ParamChecker.notEmpty(scope, "scope");
099            this.refresh = refresh;
100            this.noCleanup = noCleanup;
101        }
102    
103        /**
104         * Check if all given actions are eligible to rerun.
105         *
106         * @param actions list of CoordinatorActionBean
107         * @return true if all actions are eligible to rerun
108         */
109        private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
110            ParamChecker.notNull(coordActions, "Coord actions to be rerun");
111            boolean ret = false;
112            for (CoordinatorActionBean coordAction : coordActions) {
113                ret = true;
114                if (!coordAction.isTerminalStatus()) {
115                    ret = false;
116                    break;
117                }
118            }
119            return ret;
120        }
121    
122        /**
123         * Get the list of actions for a given coordinator job
124         * @param rerunType the rerun type (date, action)
125         * @param jobId the coordinator job id
126         * @param scope the date scope or action id scope
127         * @return the list of Coordinator actions
128         * @throws CommandException
129         */
130        public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{
131            List<CoordinatorActionBean> coordActions = null;
132            if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
133                coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope);
134            }
135            else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
136                coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
137            }
138            return coordActions;
139        }
140    
141        /**
142         * Cleanup output-events directories
143         *
144         * @param eAction coordinator action xml
145         * @param user user name
146         * @param group group name
147         */
148        @SuppressWarnings("unchecked")
149        private void cleanupOutputEvents(Element eAction, String user, String group) {
150            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
151            if (outputList != null) {
152                for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
153                    if (data.getChild("uris", data.getNamespace()) != null) {
154                        String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
155                        if (uris != null) {
156                            String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
157                            FsActionExecutor fsAe = new FsActionExecutor();
158                            for (String uri : uriArr) {
159                                Path path = new Path(uri);
160                                try {
161                                    fsAe.delete(user, group, path);
162                                    LOG.debug("Cleanup the output dir " + path);
163                                }
164                                catch (ActionExecutorException ae) {
165                                    LOG.warn("Failed to cleanup the output dir " + uri, ae);
166                                }
167                            }
168                        }
169    
170                    }
171                }
172            }
173            else {
174                LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
175            }
176        }
177    
178        /**
179         * Refresh an action's input and ouput events.
180         *
181         * @param coordJob coordinator job bean
182         * @param coordAction coordinator action bean
183         * @throws Exception thrown if failed to materialize coordinator action
184         */
185        private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
186            Configuration jobConf = null;
187            try {
188                jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
189            }
190            catch (IOException ioe) {
191                LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
192                throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
193            }
194            String jobXml = coordJob.getJobXml();
195            Element eJob = XmlUtils.parseXml(jobXml);
196            Date actualTime = new Date();
197            String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
198                    .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
199            LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
200                    + XmlUtils.prettyPrint(actionXml).toString());
201            coordAction.setActionXml(actionXml);
202        }
203    
204        /**
205         * Update an action into database table
206         *
207         * @param coordJob coordinator job bean
208         * @param coordAction coordinator action bean
209         * @param actionXml coordinator action xml
210         * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
211         */
212        private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
213                throws Exception {
214            LOG.debug("updateAction for actionId=" + coordAction.getId());
215            if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
216                LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
217                coordAction.setCreatedTime(new Date());
218            }
219            coordAction.setStatus(CoordinatorAction.Status.WAITING);
220            coordAction.setExternalId("");
221            coordAction.setExternalStatus("");
222            coordAction.setRerunTime(new Date());
223            coordAction.setLastModifiedTime(new Date());
224            updateList.add(coordAction);
225            writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
226        }
227    
228        /**
229         * Create SLA RegistrationEvent
230         *
231         * @param actionXml action xml
232         * @param actionBean coordinator action bean
233         * @param user user name
234         * @param group group name
235         * @throws Exception thrown if unable to write sla registration event
236         */
237        private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
238                throws Exception {
239            Element eAction = XmlUtils.parseXml(actionXml);
240            Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
241            SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
242                    SlaAppType.COORDINATOR_ACTION, user, group, LOG);
243            if(slaEvent != null) {
244                insertList.add(slaEvent);
245            }
246        }
247    
248        /* (non-Javadoc)
249         * @see org.apache.oozie.command.XCommand#getEntityKey()
250         */
251        @Override
252        public String getEntityKey() {
253            return jobId;
254        }
255    
256        /* (non-Javadoc)
257         * @see org.apache.oozie.command.XCommand#isLockRequired()
258         */
259        @Override
260        protected boolean isLockRequired() {
261            return true;
262        }
263    
264        /* (non-Javadoc)
265         * @see org.apache.oozie.command.XCommand#loadState()
266         */
267        @Override
268        protected void loadState() throws CommandException {
269            jpaService = Services.get().get(JPAService.class);
270            if (jpaService == null) {
271                throw new CommandException(ErrorCode.E0610);
272            }
273            try {
274                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
275                prevPending = coordJob.isPending();
276            }
277            catch (JPAExecutorException je) {
278                throw new CommandException(je);
279            }
280            LogUtils.setLogInfo(coordJob, logInfo);
281        }
282    
283        /* (non-Javadoc)
284         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
285         */
286        @Override
287        protected void verifyPrecondition() throws CommandException, PreconditionException {
288            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus());
289            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
290                    || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
291                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
292                // Call the parent so the pending flag is reset and state transition
293                // of bundle can happen
294                if (coordJob.getBundleId() != null) {
295                    bundleStatusUpdate.call();
296                }
297                throw new CommandException(ErrorCode.E1018,
298                        "coordinator job is killed or failed so all actions are not eligible to rerun!");
299            }
300    
301            // no actioins have been created for PREP job
302            if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
303                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
304                // Call the parent so the pending flag is reset and state transition
305                // of bundle can happen
306                if (coordJob.getBundleId() != null) {
307                    bundleStatusUpdate.call();
308                }
309                throw new CommandException(ErrorCode.E1018,
310                        "coordinator job is PREP so no actions are materialized to rerun!");
311            }
312        }
313    
314        @Override
315        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
316            verifyPrecondition();
317        }
318    
319        @Override
320        public void rerunChildren() throws CommandException {
321            boolean isError = false;
322            try {
323                CoordinatorActionInfo coordInfo = null;
324                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
325                List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope);
326                if (checkAllActionsRunnable(coordActions)) {
327                    for (CoordinatorActionBean coordAction : coordActions) {
328                        String actionXml = coordAction.getActionXml();
329                        if (!noCleanup) {
330                            Element eAction = XmlUtils.parseXml(actionXml);
331                            cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
332                        }
333                        if (refresh) {
334                            refreshAction(coordJob, coordAction);
335                        }
336                        updateAction(coordJob, coordAction, actionXml);
337                        if (SLAService.isEnabled()) {
338                            SLAOperations.updateRegistrationEvent(coordAction.getId());
339                        }
340                        queue(new CoordActionNotificationXCommand(coordAction), 100);
341                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
342                    }
343                }
344                else {
345                    isError = true;
346                    throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
347                }
348                coordInfo = new CoordinatorActionInfo(coordActions);
349    
350                ret = coordInfo;
351            }
352            catch (XException xex) {
353                isError = true;
354                throw new CommandException(xex);
355            }
356            catch (JDOMException jex) {
357                isError = true;
358                throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
359            }
360            catch (Exception ex) {
361                isError = true;
362                throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex);
363            }
364            finally{
365                if(isError){
366                    transitToPrevious();
367                }
368            }
369        }
370    
371        /*
372         * (non-Javadoc)
373         * @see org.apache.oozie.command.TransitionXCommand#getJob()
374         */
375        @Override
376        public Job getJob() {
377            return coordJob;
378        }
379    
380        @Override
381        public void notifyParent() throws CommandException {
382            //update bundle action
383            if (getPrevStatus() != null && coordJob.getBundleId() != null) {
384                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
385                bundleStatusUpdate.call();
386            }
387        }
388    
389        @Override
390        public void updateJob() {
391            if (getPrevStatus()!= null){
392                Job.Status coordJobStatus = getPrevStatus();
393                if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
394                    coordJob.setStatus(coordJobStatus);
395                }
396                if (prevPending) {
397                    coordJob.setPending();
398                } else {
399                    coordJob.resetPending();
400                }
401            }
402            updateList.add(coordJob);
403        }
404    
405        /* (non-Javadoc)
406         * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
407         */
408        @Override
409        public void performWrites() throws CommandException {
410            try {
411                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
412                if (EventHandlerService.isEnabled()) {
413                    generateEvents(coordJob);
414                }
415            }
416            catch (JPAExecutorException e) {
417                throw new CommandException(e);
418            }
419        }
420    
421        /* (non-Javadoc)
422         * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
423         */
424        @Override
425        public XLog getLog() {
426            return LOG;
427        }
428    
429        @Override
430        public final void transitToNext() {
431            prevStatus = coordJob.getStatus();
432            if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED
433                    || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) {
434                coordJob.setStatus(Job.Status.RUNNING);
435            }
436            else {
437                // Check for backward compatibility for Oozie versions (3.2 and before)
438                // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
439                // PAUSEDWITHERROR is not supported
440                coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
441            }
442            // used for backward support of coordinator 0.1 schema
443            coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
444            coordJob.setPending();
445        }
446    
447        private final void transitToPrevious() throws CommandException {
448            coordJob.setStatus(getPrevStatus());
449            if (!prevPending) {
450                coordJob.resetPending();
451            }
452            else {
453                coordJob.setPending();
454            }
455        }
456    }