001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Set;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.Path;
030    import org.apache.oozie.CoordinatorActionBean;
031    import org.apache.oozie.CoordinatorActionInfo;
032    import org.apache.oozie.CoordinatorJobBean;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.XException;
035    import org.apache.oozie.action.ActionExecutorException;
036    import org.apache.oozie.action.hadoop.FsActionExecutor;
037    import org.apache.oozie.client.CoordinatorAction;
038    import org.apache.oozie.client.CoordinatorJob;
039    import org.apache.oozie.client.Job;
040    import org.apache.oozie.client.SLAEvent.SlaAppType;
041    import org.apache.oozie.client.rest.RestConstants;
042    import org.apache.oozie.command.CommandException;
043    import org.apache.oozie.command.PreconditionException;
044    import org.apache.oozie.command.RerunTransitionXCommand;
045    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
046    import org.apache.oozie.coord.CoordELFunctions;
047    import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
048    import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
049    import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor;
050    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
051    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
052    import org.apache.oozie.executor.jpa.JPAExecutorException;
053    import org.apache.oozie.service.JPAService;
054    import org.apache.oozie.service.Services;
055    import org.apache.oozie.util.DateUtils;
056    import org.apache.oozie.util.InstrumentUtils;
057    import org.apache.oozie.util.LogUtils;
058    import org.apache.oozie.util.ParamChecker;
059    import org.apache.oozie.util.StatusUtils;
060    import org.apache.oozie.util.XConfiguration;
061    import org.apache.oozie.util.XLog;
062    import org.apache.oozie.util.XmlUtils;
063    import org.apache.oozie.util.db.SLADbOperations;
064    import org.jdom.Element;
065    import org.jdom.JDOMException;
066    
067    /**
068     * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
069     * <p/>
070     * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
071     * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
072     * <p/>
073     * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
074     * <p/>
075     * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
076     */
077    public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
078    
079        private String rerunType;
080        private String scope;
081        private boolean refresh;
082        private boolean noCleanup;
083        private CoordinatorJobBean coordJob = null;
084        private JPAService jpaService = null;
085        protected boolean prevPending;
086    
087        /**
088         * The constructor for class {@link CoordRerunXCommand}
089         *
090         * @param jobId the job id
091         * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
092         * @param scope the rerun scope for given rerunType separated by ","
093         * @param refresh true if user wants to refresh input/output dataset urls
094         * @param noCleanup false if user wants to cleanup output events for given rerun actions
095         */
096        public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
097            super("coord_rerun", "coord_rerun", 1);
098            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
099            this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
100            this.scope = ParamChecker.notEmpty(scope, "scope");
101            this.refresh = refresh;
102            this.noCleanup = noCleanup;
103        }
104    
105        /**
106         * Get the list of actions for given id ranges
107         *
108         * @param jobId coordinator job id
109         * @param scope the id range to rerun separated by ","
110         * @return the list of all actions to rerun
111         * @throws CommandException thrown if failed to get coordinator actions by given id range
112         */
113        private List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException {
114            ParamChecker.notEmpty(jobId, "jobId");
115            ParamChecker.notEmpty(scope, "scope");
116    
117            Set<String> actions = new HashSet<String>();
118            String[] list = scope.split(",");
119            for (String s : list) {
120                s = s.trim();
121                if (s.contains("-")) {
122                    String[] range = s.split("-");
123                    if (range.length != 2) {
124                        throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
125                    }
126                    int start;
127                    int end;
128                    try {
129                        start = Integer.parseInt(range[0].trim());
130                        end = Integer.parseInt(range[1].trim());
131                        if (start > end) {
132                            throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
133                        }
134                    }
135                    catch (NumberFormatException ne) {
136                        throw new CommandException(ErrorCode.E0302, ne);
137                    }
138                    for (int i = start; i <= end; i++) {
139                        actions.add(jobId + "@" + i);
140                    }
141                }
142                else {
143                    try {
144                        Integer.parseInt(s);
145                    }
146                    catch (NumberFormatException ne) {
147                        throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
148                                + "'. Integer only.");
149                    }
150                    actions.add(jobId + "@" + s);
151                }
152            }
153    
154            List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
155            for (String id : actions) {
156                CoordinatorActionBean coordAction;
157                try {
158                    coordAction = jpaService.execute(new CoordActionGetJPAExecutor(id));
159                }
160                catch (JPAExecutorException je) {
161                    throw new CommandException(je);
162                }
163                coordActions.add(coordAction);
164                LOG.debug("Rerun coordinator for actionId='" + id + "'");
165            }
166            return coordActions;
167        }
168    
169        /**
170         * Get the list of actions for given date ranges
171         *
172         * @param jobId coordinator job id
173         * @param scope the date range to rerun separated by ","
174         * @return the list of dates to rerun
175         * @throws CommandException thrown if failed to get coordinator actions by given date range
176         */
177        private List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope) throws CommandException {
178            ParamChecker.notEmpty(jobId, "jobId");
179            ParamChecker.notEmpty(scope, "scope");
180    
181            Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>();
182            String[] list = scope.split(",");
183            for (String s : list) {
184                s = s.trim();
185                if (s.contains("::")) {
186                    String[] dateRange = s.split("::");
187                    if (dateRange.length != 2) {
188                        throw new CommandException(ErrorCode.E0302, "format is wrong for date's range '" + s + "'");
189                    }
190                    Date start;
191                    Date end;
192                    try {
193                        start = DateUtils.parseDateUTC(dateRange[0].trim());
194                        end = DateUtils.parseDateUTC(dateRange[1].trim());
195                        if (start.after(end)) {
196                            throw new CommandException(ErrorCode.E0302, "start date is older than end date: '" + s + "'");
197                        }
198                    }
199                    catch (Exception e) {
200                        throw new CommandException(ErrorCode.E0302, e);
201                    }
202    
203                    List<CoordinatorActionBean> listOfActions = getActionIdsFromDateRange(jobId, start, end);
204                    actionSet.addAll(listOfActions);
205                }
206                else {
207                    try {
208                        Date date = DateUtils.parseDateUTC(s.trim());
209                        CoordinatorActionBean coordAction = jpaService
210                                .execute(new CoordJobGetActionForNominalTimeJPAExecutor(jobId, date));
211                        actionSet.add(coordAction);
212                    }
213                    catch (JPAExecutorException e) {
214                        throw new CommandException(e);
215                    }
216                    catch (Exception e) {
217                        throw new CommandException(ErrorCode.E0302, e);
218                    }
219                }
220            }
221    
222            List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>();
223            for (CoordinatorActionBean coordAction : actionSet) {
224                coordActions.add(coordAction);
225                LOG.debug("Rerun coordinator for actionId='" + coordAction.getId() + "'");
226            }
227            return coordActions;
228        }
229    
230        /**
231         * Get coordinator action ids between given start and end time
232         *
233         * @param jobId coordinator job id
234         * @param start start time
235         * @param end end time
236         * @return a list of coordinator actions belong to the range of start and end time
237         * @throws CommandException thrown if failed to get coordinator actions
238         */
239        private List<CoordinatorActionBean> getActionIdsFromDateRange(String jobId, Date start, Date end)
240                throws CommandException {
241            List<CoordinatorActionBean> list;
242            try {
243                list = jpaService.execute(new CoordJobGetActionsForDatesJPAExecutor(jobId, start, end));
244            }
245            catch (JPAExecutorException je) {
246                throw new CommandException(je);
247            }
248            return list;
249        }
250    
251        /**
252         * Check if all given actions are eligible to rerun.
253         *
254         * @param actions list of CoordinatorActionBean
255         * @return true if all actions are eligible to rerun
256         */
257        private boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
258            boolean ret = false;
259            for (CoordinatorActionBean coordAction : coordActions) {
260                ret = true;
261                if (!coordAction.isTerminalStatus()) {
262                    ret = false;
263                    break;
264                }
265            }
266            return ret;
267        }
268    
269        /**
270         * Cleanup output-events directories
271         *
272         * @param eAction coordinator action xml
273         * @param user user name
274         * @param group group name
275         */
276        @SuppressWarnings("unchecked")
277        private void cleanupOutputEvents(Element eAction, String user, String group) {
278            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
279            if (outputList != null) {
280                for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
281                    if (data.getChild("uris", data.getNamespace()) != null) {
282                        String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
283                        if (uris != null) {
284                            String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
285                            FsActionExecutor fsAe = new FsActionExecutor();
286                            for (String uri : uriArr) {
287                                Path path = new Path(uri);
288                                try {
289                                    fsAe.delete(user, group, path);
290                                    LOG.debug("Cleanup the output dir " + path);
291                                }
292                                catch (ActionExecutorException ae) {
293                                    LOG.warn("Failed to cleanup the output dir " + uri, ae);
294                                }
295                            }
296                        }
297    
298                    }
299                }
300            }
301            else {
302                LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
303            }
304        }
305    
306        /**
307         * Refresh an action's input and ouput events.
308         *
309         * @param coordJob coordinator job bean
310         * @param coordAction coordinator action bean
311         * @throws Exception thrown if failed to materialize coordinator action
312         */
313        private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
314            Configuration jobConf = null;
315            try {
316                jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
317            }
318            catch (IOException ioe) {
319                LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
320                throw new CommandException(ErrorCode.E1005, ioe);
321            }
322            String jobXml = coordJob.getJobXml();
323            Element eJob = XmlUtils.parseXml(jobXml);
324            Date actualTime = new Date();
325            String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
326                    .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
327            LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
328                    + XmlUtils.prettyPrint(actionXml).toString());
329            coordAction.setActionXml(actionXml);
330        }
331    
332        /**
333         * Update an action into database table
334         *
335         * @param coordJob coordinator job bean
336         * @param coordAction coordinator action bean
337         * @param actionXml coordinator action xml
338         * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
339         */
340        private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
341                throws Exception {
342            LOG.debug("updateAction for actionId=" + coordAction.getId());
343            if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
344                LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
345                coordAction.setCreatedTime(new Date());
346            }
347            coordAction.setStatus(CoordinatorAction.Status.WAITING);
348            coordAction.setExternalId("");
349            coordAction.setExternalStatus("");
350            coordAction.setRerunTime(new Date());
351            coordAction.setLastModifiedTime(new Date());
352            jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
353            writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
354        }
355    
356        /**
357         * Create SLA RegistrationEvent
358         *
359         * @param actionXml action xml
360         * @param actionBean coordinator action bean
361         * @param user user name
362         * @param group group name
363         * @throws Exception thrown if unable to write sla registration event
364         */
365        private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
366                throws Exception {
367            Element eAction = XmlUtils.parseXml(actionXml);
368            Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
369            SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, group,
370                    LOG);
371        }
372    
373        /* (non-Javadoc)
374         * @see org.apache.oozie.command.XCommand#getEntityKey()
375         */
376        @Override
377        protected String getEntityKey() {
378            return jobId;
379        }
380    
381        /* (non-Javadoc)
382         * @see org.apache.oozie.command.XCommand#isLockRequired()
383         */
384        @Override
385        protected boolean isLockRequired() {
386            return true;
387        }
388    
389        /* (non-Javadoc)
390         * @see org.apache.oozie.command.XCommand#loadState()
391         */
392        @Override
393        protected void loadState() throws CommandException {
394            jpaService = Services.get().get(JPAService.class);
395            if (jpaService == null) {
396                throw new CommandException(ErrorCode.E0610);
397            }
398            try {
399                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
400                prevPending = coordJob.isPending();
401            }
402            catch (JPAExecutorException je) {
403                throw new CommandException(je);
404            }
405            LogUtils.setLogInfo(coordJob, logInfo);
406        }
407    
408        /* (non-Javadoc)
409         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
410         */
411        @Override
412        protected void verifyPrecondition() throws CommandException, PreconditionException {
413            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
414                    || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
415                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
416                throw new CommandException(ErrorCode.E1018,
417                        "coordinator job is killed or failed so all actions are not eligible to rerun!");
418            }
419    
420            // no actioins have been created for PREP job
421            if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
422                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
423                throw new CommandException(ErrorCode.E1018,
424                        "coordinator job is PREP so no actions are materialized to rerun!");
425            }
426        }
427    
428        @Override
429        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
430            verifyPrecondition();
431        }
432    
433        @Override
434        public void rerunChildren() throws CommandException {
435            boolean isError = false;
436            try {
437                CoordinatorActionInfo coordInfo = null;
438                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
439                List<CoordinatorActionBean> coordActions;
440                if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
441                    coordActions = getCoordActionsFromDates(jobId, scope);
442                }
443                else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
444                    coordActions = getCoordActionsFromIds(jobId, scope);
445                }
446                else {
447                    isError = true;
448                    throw new CommandException(ErrorCode.E1018, "date or action expected.");
449                }
450                if (checkAllActionsRunnable(coordActions)) {
451                    for (CoordinatorActionBean coordAction : coordActions) {
452                        String actionXml = coordAction.getActionXml();
453                        if (!noCleanup) {
454                            Element eAction = XmlUtils.parseXml(actionXml);
455                            cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
456                        }
457                        if (refresh) {
458                            refreshAction(coordJob, coordAction);
459                        }
460                        updateAction(coordJob, coordAction, actionXml);
461    
462                        queue(new CoordActionNotificationXCommand(coordAction), 100);
463                        queue(new CoordActionInputCheckXCommand(coordAction.getId()), 100);
464                    }
465                }
466                else {
467                    isError = true;
468                    throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
469                }
470                coordInfo = new CoordinatorActionInfo(coordActions);
471    
472                ret = coordInfo;
473            }
474            catch (XException xex) {
475                isError = true;
476                throw new CommandException(xex);
477            }
478            catch (JDOMException jex) {
479                isError = true;
480                throw new CommandException(ErrorCode.E0700, jex);
481            }
482            catch (Exception ex) {
483                isError = true;
484                throw new CommandException(ErrorCode.E1018, ex);
485            }
486            finally{
487                if(isError){
488                    transitToPrevious();
489                }
490            }
491        }
492    
493        /*
494         * (non-Javadoc)
495         * @see org.apache.oozie.command.TransitionXCommand#getJob()
496         */
497        @Override
498        public Job getJob() {
499            return coordJob;
500        }
501    
502        @Override
503        public void notifyParent() throws CommandException {
504            //update bundle action
505            if (getPrevStatus() != null && coordJob.getBundleId() != null) {
506                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
507                bundleStatusUpdate.call();
508            }
509        }
510    
511        @Override
512        public void updateJob() throws CommandException {
513            try {
514                // rerun a paused coordinator job will keep job status at paused and pending at previous pending
515                if (getPrevStatus()!= null && getPrevStatus().equals(Job.Status.PAUSED)) {
516                    coordJob.setStatus(Job.Status.PAUSED);
517                    if (prevPending) {
518                        coordJob.setPending();
519                    } else {
520                        coordJob.resetPending();
521                    }
522                }
523    
524                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
525            }
526            catch (JPAExecutorException je) {
527                throw new CommandException(je);
528            }
529        }
530    
531        /* (non-Javadoc)
532         * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
533         */
534        @Override
535        public XLog getLog() {
536            return LOG;
537        }
538    
539        @Override
540        public final void transitToNext() {
541            prevStatus = coordJob.getStatus();
542            coordJob.setStatus(Job.Status.RUNNING);
543            // used for backward support of coordinator 0.1 schema
544            coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
545            coordJob.setPending();
546        }
547    
548        private final void transitToPrevious() throws CommandException {
549            coordJob.setStatus(getPrevStatus());
550            if (!prevPending) {
551                coordJob.resetPending();
552            }
553            else {
554                coordJob.setPending();
555            }
556        }
557    }