001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.ArrayList;
023    import java.util.Date;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Set;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.Path;
030    import org.apache.oozie.CoordinatorActionBean;
031    import org.apache.oozie.CoordinatorActionInfo;
032    import org.apache.oozie.CoordinatorJobBean;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.XException;
035    import org.apache.oozie.action.ActionExecutorException;
036    import org.apache.oozie.action.hadoop.FsActionExecutor;
037    import org.apache.oozie.client.CoordinatorAction;
038    import org.apache.oozie.client.CoordinatorJob;
039    import org.apache.oozie.client.Job;
040    import org.apache.oozie.client.SLAEvent.SlaAppType;
041    import org.apache.oozie.client.rest.RestConstants;
042    import org.apache.oozie.command.CommandException;
043    import org.apache.oozie.command.PreconditionException;
044    import org.apache.oozie.command.RerunTransitionXCommand;
045    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
046    import org.apache.oozie.coord.CoordELFunctions;
047    import org.apache.oozie.coord.CoordUtils;
048    import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
049    import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
050    import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor;
051    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
052    import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
053    import org.apache.oozie.executor.jpa.JPAExecutorException;
054    import org.apache.oozie.service.JPAService;
055    import org.apache.oozie.service.Services;
056    import org.apache.oozie.util.DateUtils;
057    import org.apache.oozie.util.InstrumentUtils;
058    import org.apache.oozie.util.LogUtils;
059    import org.apache.oozie.util.ParamChecker;
060    import org.apache.oozie.util.StatusUtils;
061    import org.apache.oozie.util.XConfiguration;
062    import org.apache.oozie.util.XLog;
063    import org.apache.oozie.util.XmlUtils;
064    import org.apache.oozie.util.db.SLADbOperations;
065    import org.jdom.Element;
066    import org.jdom.JDOMException;
067    
068    /**
069     * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
070     * <p/>
071     * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
072     * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
073     * <p/>
074     * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
075     * <p/>
076     * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
077     */
078    public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
079    
080        private String rerunType;
081        private String scope;
082        private boolean refresh;
083        private boolean noCleanup;
084        private CoordinatorJobBean coordJob = null;
085        private JPAService jpaService = null;
086        protected boolean prevPending;
087    
088        /**
089         * The constructor for class {@link CoordRerunXCommand}
090         *
091         * @param jobId the job id
092         * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
093         * @param scope the rerun scope for given rerunType separated by ","
094         * @param refresh true if user wants to refresh input/output dataset urls
095         * @param noCleanup false if user wants to cleanup output events for given rerun actions
096         */
097        public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
098            super("coord_rerun", "coord_rerun", 1);
099            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
100            this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
101            this.scope = ParamChecker.notEmpty(scope, "scope");
102            this.refresh = refresh;
103            this.noCleanup = noCleanup;
104        }
105    
106        /**
107         * Check if all given actions are eligible to rerun.
108         *
109         * @param actions list of CoordinatorActionBean
110         * @return true if all actions are eligible to rerun
111         */
112        private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
113            ParamChecker.notNull(coordActions, "Coord actions to be rerun");
114            boolean ret = false;
115            for (CoordinatorActionBean coordAction : coordActions) {
116                ret = true;
117                if (!coordAction.isTerminalStatus()) {
118                    ret = false;
119                    break;
120                }
121            }
122            return ret;
123        }
124    
125        /**
126         * Get the list of actions for a given coordinator job
127         * @param rerunType the rerun type (date, action)
128         * @param jobId the coordinator job id
129         * @param scope the date scope or action id scope
130         * @return the list of Coordinator actions
131         * @throws CommandException
132         */
133        public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{
134            List<CoordinatorActionBean> coordActions = null;
135            if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
136                coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope);
137            }
138            else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
139                coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
140            }
141            return coordActions;
142        }
143    
144        /**
145         * Cleanup output-events directories
146         *
147         * @param eAction coordinator action xml
148         * @param user user name
149         * @param group group name
150         */
151        @SuppressWarnings("unchecked")
152        private void cleanupOutputEvents(Element eAction, String user, String group) {
153            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
154            if (outputList != null) {
155                for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
156                    if (data.getChild("uris", data.getNamespace()) != null) {
157                        String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
158                        if (uris != null) {
159                            String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
160                            FsActionExecutor fsAe = new FsActionExecutor();
161                            for (String uri : uriArr) {
162                                Path path = new Path(uri);
163                                try {
164                                    fsAe.delete(user, group, path);
165                                    LOG.debug("Cleanup the output dir " + path);
166                                }
167                                catch (ActionExecutorException ae) {
168                                    LOG.warn("Failed to cleanup the output dir " + uri, ae);
169                                }
170                            }
171                        }
172    
173                    }
174                }
175            }
176            else {
177                LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
178            }
179        }
180    
181        /**
182         * Refresh an action's input and ouput events.
183         *
184         * @param coordJob coordinator job bean
185         * @param coordAction coordinator action bean
186         * @throws Exception thrown if failed to materialize coordinator action
187         */
188        private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
189            Configuration jobConf = null;
190            try {
191                jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
192            }
193            catch (IOException ioe) {
194                LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
195                throw new CommandException(ErrorCode.E1005, ioe);
196            }
197            String jobXml = coordJob.getJobXml();
198            Element eJob = XmlUtils.parseXml(jobXml);
199            Date actualTime = new Date();
200            String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
201                    .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
202            LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
203                    + XmlUtils.prettyPrint(actionXml).toString());
204            coordAction.setActionXml(actionXml);
205        }
206    
207        /**
208         * Update an action into database table
209         *
210         * @param coordJob coordinator job bean
211         * @param coordAction coordinator action bean
212         * @param actionXml coordinator action xml
213         * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
214         */
215        private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
216                throws Exception {
217            LOG.debug("updateAction for actionId=" + coordAction.getId());
218            if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
219                LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
220                coordAction.setCreatedTime(new Date());
221            }
222            coordAction.setStatus(CoordinatorAction.Status.WAITING);
223            coordAction.setExternalId("");
224            coordAction.setExternalStatus("");
225            coordAction.setRerunTime(new Date());
226            coordAction.setLastModifiedTime(new Date());
227            jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction));
228            writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
229        }
230    
231        /**
232         * Create SLA RegistrationEvent
233         *
234         * @param actionXml action xml
235         * @param actionBean coordinator action bean
236         * @param user user name
237         * @param group group name
238         * @throws Exception thrown if unable to write sla registration event
239         */
240        private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
241                throws Exception {
242            Element eAction = XmlUtils.parseXml(actionXml);
243            Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
244            SLADbOperations.writeSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, group,
245                    LOG);
246        }
247    
248        /* (non-Javadoc)
249         * @see org.apache.oozie.command.XCommand#getEntityKey()
250         */
251        @Override
252        public String getEntityKey() {
253            return jobId;
254        }
255    
256        /* (non-Javadoc)
257         * @see org.apache.oozie.command.XCommand#isLockRequired()
258         */
259        @Override
260        protected boolean isLockRequired() {
261            return true;
262        }
263    
264        /* (non-Javadoc)
265         * @see org.apache.oozie.command.XCommand#loadState()
266         */
267        @Override
268        protected void loadState() throws CommandException {
269            jpaService = Services.get().get(JPAService.class);
270            if (jpaService == null) {
271                throw new CommandException(ErrorCode.E0610);
272            }
273            try {
274                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
275                prevPending = coordJob.isPending();
276            }
277            catch (JPAExecutorException je) {
278                throw new CommandException(je);
279            }
280            LogUtils.setLogInfo(coordJob, logInfo);
281        }
282    
283        /* (non-Javadoc)
284         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
285         */
286        @Override
287        protected void verifyPrecondition() throws CommandException, PreconditionException {
288            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
289                    || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
290                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
291                throw new CommandException(ErrorCode.E1018,
292                        "coordinator job is killed or failed so all actions are not eligible to rerun!");
293            }
294    
295            // no actioins have been created for PREP job
296            if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
297                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
298                throw new CommandException(ErrorCode.E1018,
299                        "coordinator job is PREP so no actions are materialized to rerun!");
300            }
301        }
302    
303        @Override
304        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
305            verifyPrecondition();
306        }
307    
308        @Override
309        public void rerunChildren() throws CommandException {
310            boolean isError = false;
311            try {
312                CoordinatorActionInfo coordInfo = null;
313                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
314                List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope);
315                if (checkAllActionsRunnable(coordActions)) {
316                    for (CoordinatorActionBean coordAction : coordActions) {
317                        String actionXml = coordAction.getActionXml();
318                        if (!noCleanup) {
319                            Element eAction = XmlUtils.parseXml(actionXml);
320                            cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
321                        }
322                        if (refresh) {
323                            refreshAction(coordJob, coordAction);
324                        }
325                        updateAction(coordJob, coordAction, actionXml);
326    
327                        queue(new CoordActionNotificationXCommand(coordAction), 100);
328                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
329                    }
330                }
331                else {
332                    isError = true;
333                    throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
334                }
335                coordInfo = new CoordinatorActionInfo(coordActions);
336    
337                ret = coordInfo;
338            }
339            catch (XException xex) {
340                isError = true;
341                throw new CommandException(xex);
342            }
343            catch (JDOMException jex) {
344                isError = true;
345                throw new CommandException(ErrorCode.E0700, jex);
346            }
347            catch (Exception ex) {
348                isError = true;
349                throw new CommandException(ErrorCode.E1018, ex);
350            }
351            finally{
352                if(isError){
353                    transitToPrevious();
354                }
355            }
356        }
357    
358        /*
359         * (non-Javadoc)
360         * @see org.apache.oozie.command.TransitionXCommand#getJob()
361         */
362        @Override
363        public Job getJob() {
364            return coordJob;
365        }
366    
367        @Override
368        public void notifyParent() throws CommandException {
369            //update bundle action
370            if (getPrevStatus() != null && coordJob.getBundleId() != null) {
371                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
372                bundleStatusUpdate.call();
373            }
374        }
375    
376        @Override
377        public void updateJob() throws CommandException {
378            try {
379                // rerun a paused coordinator job will keep job status at paused and pending at previous pending
380                if (getPrevStatus()!= null && getPrevStatus().equals(Job.Status.PAUSED)) {
381                    coordJob.setStatus(Job.Status.PAUSED);
382                    if (prevPending) {
383                        coordJob.setPending();
384                    } else {
385                        coordJob.resetPending();
386                    }
387                }
388    
389                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
390            }
391            catch (JPAExecutorException je) {
392                throw new CommandException(je);
393            }
394        }
395    
396        /* (non-Javadoc)
397         * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
398         */
399        @Override
400        public XLog getLog() {
401            return LOG;
402        }
403    
404        @Override
405        public final void transitToNext() {
406            prevStatus = coordJob.getStatus();
407            coordJob.setStatus(Job.Status.RUNNING);
408            // used for backward support of coordinator 0.1 schema
409            coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
410            coordJob.setPending();
411        }
412    
413        private final void transitToPrevious() throws CommandException {
414            coordJob.setStatus(getPrevStatus());
415            if (!prevPending) {
416                coordJob.resetPending();
417            }
418            else {
419                coordJob.setPending();
420            }
421        }
422    }