001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.util.Date;
023    import java.util.List;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.Path;
026    import org.apache.oozie.CoordinatorActionBean;
027    import org.apache.oozie.CoordinatorActionInfo;
028    import org.apache.oozie.CoordinatorJobBean;
029    import org.apache.oozie.ErrorCode;
030    import org.apache.oozie.SLAEventBean;
031    import org.apache.oozie.XException;
032    import org.apache.oozie.action.ActionExecutorException;
033    import org.apache.oozie.action.hadoop.FsActionExecutor;
034    import org.apache.oozie.client.CoordinatorAction;
035    import org.apache.oozie.client.CoordinatorJob;
036    import org.apache.oozie.client.Job;
037    import org.apache.oozie.client.SLAEvent.SlaAppType;
038    import org.apache.oozie.client.rest.RestConstants;
039    import org.apache.oozie.command.CommandException;
040    import org.apache.oozie.command.PreconditionException;
041    import org.apache.oozie.command.RerunTransitionXCommand;
042    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
043    import org.apache.oozie.coord.CoordELFunctions;
044    import org.apache.oozie.coord.CoordUtils;
045    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046    import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
047    import org.apache.oozie.executor.jpa.JPAExecutorException;
048    import org.apache.oozie.service.JPAService;
049    import org.apache.oozie.service.Services;
050    import org.apache.oozie.util.InstrumentUtils;
051    import org.apache.oozie.util.LogUtils;
052    import org.apache.oozie.util.ParamChecker;
053    import org.apache.oozie.util.StatusUtils;
054    import org.apache.oozie.util.XConfiguration;
055    import org.apache.oozie.util.XLog;
056    import org.apache.oozie.util.XmlUtils;
057    import org.apache.oozie.util.db.SLADbOperations;
058    import org.jdom.Element;
059    import org.jdom.JDOMException;
060    
061    /**
062     * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
063     * <p/>
064     * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
065     * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
066     * <p/>
067     * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
068     * <p/>
069     * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
070     */
071    public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
072    
073        private String rerunType;
074        private String scope;
075        private boolean refresh;
076        private boolean noCleanup;
077        private CoordinatorJobBean coordJob = null;
078        private JPAService jpaService = null;
079        protected boolean prevPending;
080    
081        /**
082         * The constructor for class {@link CoordRerunXCommand}
083         *
084         * @param jobId the job id
085         * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
086         * @param scope the rerun scope for given rerunType separated by ","
087         * @param refresh true if user wants to refresh input/output dataset urls
088         * @param noCleanup false if user wants to cleanup output events for given rerun actions
089         */
090        public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
091            super("coord_rerun", "coord_rerun", 1);
092            this.jobId = ParamChecker.notEmpty(jobId, "jobId");
093            this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
094            this.scope = ParamChecker.notEmpty(scope, "scope");
095            this.refresh = refresh;
096            this.noCleanup = noCleanup;
097        }
098    
099        /**
100         * Check if all given actions are eligible to rerun.
101         *
102         * @param actions list of CoordinatorActionBean
103         * @return true if all actions are eligible to rerun
104         */
105        private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
106            ParamChecker.notNull(coordActions, "Coord actions to be rerun");
107            boolean ret = false;
108            for (CoordinatorActionBean coordAction : coordActions) {
109                ret = true;
110                if (!coordAction.isTerminalStatus()) {
111                    ret = false;
112                    break;
113                }
114            }
115            return ret;
116        }
117    
118        /**
119         * Get the list of actions for a given coordinator job
120         * @param rerunType the rerun type (date, action)
121         * @param jobId the coordinator job id
122         * @param scope the date scope or action id scope
123         * @return the list of Coordinator actions
124         * @throws CommandException
125         */
126        public static List<CoordinatorActionBean> getCoordActions(String rerunType, String jobId, String scope) throws CommandException{
127            List<CoordinatorActionBean> coordActions = null;
128            if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) {
129                coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope);
130            }
131            else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) {
132                coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope);
133            }
134            return coordActions;
135        }
136    
137        /**
138         * Cleanup output-events directories
139         *
140         * @param eAction coordinator action xml
141         * @param user user name
142         * @param group group name
143         */
144        @SuppressWarnings("unchecked")
145        private void cleanupOutputEvents(Element eAction, String user, String group) {
146            Element outputList = eAction.getChild("output-events", eAction.getNamespace());
147            if (outputList != null) {
148                for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
149                    if (data.getChild("uris", data.getNamespace()) != null) {
150                        String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
151                        if (uris != null) {
152                            String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
153                            FsActionExecutor fsAe = new FsActionExecutor();
154                            for (String uri : uriArr) {
155                                Path path = new Path(uri);
156                                try {
157                                    fsAe.delete(user, group, path);
158                                    LOG.debug("Cleanup the output dir " + path);
159                                }
160                                catch (ActionExecutorException ae) {
161                                    LOG.warn("Failed to cleanup the output dir " + uri, ae);
162                                }
163                            }
164                        }
165    
166                    }
167                }
168            }
169            else {
170                LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
171            }
172        }
173    
174        /**
175         * Refresh an action's input and ouput events.
176         *
177         * @param coordJob coordinator job bean
178         * @param coordAction coordinator action bean
179         * @throws Exception thrown if failed to materialize coordinator action
180         */
181        private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
182            Configuration jobConf = null;
183            try {
184                jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
185            }
186            catch (IOException ioe) {
187                LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
188                throw new CommandException(ErrorCode.E1005, ioe);
189            }
190            String jobXml = coordJob.getJobXml();
191            Element eJob = XmlUtils.parseXml(jobXml);
192            Date actualTime = new Date();
193            String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
194                    .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
195            LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
196                    + XmlUtils.prettyPrint(actionXml).toString());
197            coordAction.setActionXml(actionXml);
198        }
199    
200        /**
201         * Update an action into database table
202         *
203         * @param coordJob coordinator job bean
204         * @param coordAction coordinator action bean
205         * @param actionXml coordinator action xml
206         * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
207         */
208        private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml)
209                throws Exception {
210            LOG.debug("updateAction for actionId=" + coordAction.getId());
211            if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
212                LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
213                coordAction.setCreatedTime(new Date());
214            }
215            coordAction.setStatus(CoordinatorAction.Status.WAITING);
216            coordAction.setExternalId("");
217            coordAction.setExternalStatus("");
218            coordAction.setRerunTime(new Date());
219            coordAction.setLastModifiedTime(new Date());
220            updateList.add(coordAction);
221            writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
222        }
223    
224        /**
225         * Create SLA RegistrationEvent
226         *
227         * @param actionXml action xml
228         * @param actionBean coordinator action bean
229         * @param user user name
230         * @param group group name
231         * @throws Exception thrown if unable to write sla registration event
232         */
233        private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
234                throws Exception {
235            Element eAction = XmlUtils.parseXml(actionXml);
236            Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
237            SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
238                    SlaAppType.COORDINATOR_ACTION, user, group, LOG);
239            if(slaEvent != null) {
240                insertList.add(slaEvent);
241            }
242        }
243    
244        /* (non-Javadoc)
245         * @see org.apache.oozie.command.XCommand#getEntityKey()
246         */
247        @Override
248        public String getEntityKey() {
249            return jobId;
250        }
251    
252        /* (non-Javadoc)
253         * @see org.apache.oozie.command.XCommand#isLockRequired()
254         */
255        @Override
256        protected boolean isLockRequired() {
257            return true;
258        }
259    
260        /* (non-Javadoc)
261         * @see org.apache.oozie.command.XCommand#loadState()
262         */
263        @Override
264        protected void loadState() throws CommandException {
265            jpaService = Services.get().get(JPAService.class);
266            if (jpaService == null) {
267                throw new CommandException(ErrorCode.E0610);
268            }
269            try {
270                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId));
271                prevPending = coordJob.isPending();
272            }
273            catch (JPAExecutorException je) {
274                throw new CommandException(je);
275            }
276            LogUtils.setLogInfo(coordJob, logInfo);
277        }
278    
279        /* (non-Javadoc)
280         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
281         */
282        @Override
283        protected void verifyPrecondition() throws CommandException, PreconditionException {
284            if (coordJob.getStatus() == CoordinatorJob.Status.KILLED
285                    || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
286                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
287                throw new CommandException(ErrorCode.E1018,
288                        "coordinator job is killed or failed so all actions are not eligible to rerun!");
289            }
290    
291            // no actioins have been created for PREP job
292            if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
293                LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
294                throw new CommandException(ErrorCode.E1018,
295                        "coordinator job is PREP so no actions are materialized to rerun!");
296            }
297        }
298    
299        @Override
300        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
301            verifyPrecondition();
302        }
303    
304        @Override
305        public void rerunChildren() throws CommandException {
306            boolean isError = false;
307            try {
308                CoordinatorActionInfo coordInfo = null;
309                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
310                List<CoordinatorActionBean> coordActions = getCoordActions(rerunType, jobId, scope);
311                if (checkAllActionsRunnable(coordActions)) {
312                    for (CoordinatorActionBean coordAction : coordActions) {
313                        String actionXml = coordAction.getActionXml();
314                        if (!noCleanup) {
315                            Element eAction = XmlUtils.parseXml(actionXml);
316                            cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
317                        }
318                        if (refresh) {
319                            refreshAction(coordJob, coordAction);
320                        }
321                        updateAction(coordJob, coordAction, actionXml);
322    
323                        queue(new CoordActionNotificationXCommand(coordAction), 100);
324                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
325                    }
326                }
327                else {
328                    isError = true;
329                    throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
330                }
331                coordInfo = new CoordinatorActionInfo(coordActions);
332    
333                ret = coordInfo;
334            }
335            catch (XException xex) {
336                isError = true;
337                throw new CommandException(xex);
338            }
339            catch (JDOMException jex) {
340                isError = true;
341                throw new CommandException(ErrorCode.E0700, jex);
342            }
343            catch (Exception ex) {
344                isError = true;
345                throw new CommandException(ErrorCode.E1018, ex);
346            }
347            finally{
348                if(isError){
349                    transitToPrevious();
350                }
351            }
352        }
353    
354        /*
355         * (non-Javadoc)
356         * @see org.apache.oozie.command.TransitionXCommand#getJob()
357         */
358        @Override
359        public Job getJob() {
360            return coordJob;
361        }
362    
363        @Override
364        public void notifyParent() throws CommandException {
365            //update bundle action
366            if (getPrevStatus() != null && coordJob.getBundleId() != null) {
367                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
368                bundleStatusUpdate.call();
369            }
370        }
371    
372        @Override
373        public void updateJob() {
374            if (getPrevStatus()!= null){
375                Job.Status coordJobStatus = getPrevStatus();
376                if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
377                    coordJob.setStatus(coordJobStatus);
378                }
379                if (prevPending) {
380                    coordJob.setPending();
381                } else {
382                    coordJob.resetPending();
383                }
384            }
385    
386            updateList.add(coordJob);
387        }
388    
389        /* (non-Javadoc)
390         * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
391         */
392        @Override
393        public void performWrites() throws CommandException {
394            try {
395                jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
396            }
397            catch (JPAExecutorException e) {
398                throw new CommandException(e);
399            }
400        }
401    
402        /* (non-Javadoc)
403         * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
404         */
405        @Override
406        public XLog getLog() {
407            return LOG;
408        }
409    
410        @Override
411        public final void transitToNext() {
412            prevStatus = coordJob.getStatus();
413            if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED
414                    || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) {
415                coordJob.setStatus(Job.Status.RUNNING);
416            }
417            else {
418                // Check for backward compatibility for Oozie versions (3.2 and before)
419                // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
420                // PAUSEDWITHERROR is not supported
421                coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
422            }
423            // used for backward support of coordinator 0.1 schema
424            coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
425            coordJob.setPending();
426        }
427    
428        private final void transitToPrevious() throws CommandException {
429            coordJob.setStatus(getPrevStatus());
430            if (!prevPending) {
431                coordJob.resetPending();
432            }
433            else {
434                coordJob.setPending();
435            }
436        }
437    }