001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.util.Date;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.oozie.CoordinatorActionBean;
027import org.apache.oozie.CoordinatorActionInfo;
028import org.apache.oozie.CoordinatorJobBean;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.SLAEventBean;
031import org.apache.oozie.XException;
032import org.apache.oozie.action.ActionExecutorException;
033import org.apache.oozie.action.hadoop.FsActionExecutor;
034import org.apache.oozie.client.CoordinatorAction;
035import org.apache.oozie.client.CoordinatorJob;
036import org.apache.oozie.client.Job;
037import org.apache.oozie.client.SLAEvent.SlaAppType;
038import org.apache.oozie.client.rest.RestConstants;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.command.PreconditionException;
041import org.apache.oozie.command.RerunTransitionXCommand;
042import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
043import org.apache.oozie.coord.CoordELFunctions;
044import org.apache.oozie.coord.CoordUtils;
045import org.apache.oozie.executor.jpa.BatchQueryExecutor;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
047import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
048import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
049import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
050import org.apache.oozie.executor.jpa.JPAExecutorException;
051import org.apache.oozie.service.EventHandlerService;
052import org.apache.oozie.sla.SLAOperations;
053import org.apache.oozie.sla.service.SLAService;
054import org.apache.oozie.util.InstrumentUtils;
055import org.apache.oozie.util.LogUtils;
056import org.apache.oozie.util.ParamChecker;
057import org.apache.oozie.util.StatusUtils;
058import org.apache.oozie.util.XConfiguration;
059import org.apache.oozie.util.XLog;
060import org.apache.oozie.util.XmlUtils;
061import org.apache.oozie.util.db.SLADbOperations;
062import org.jdom.Element;
063import org.jdom.JDOMException;
064
065/**
066 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
067 * <p/>
068 * The "rerunType" can be set as {@link RestConstants.JOB_COORD_RERUN_DATE} or
069 * {@link RestConstants.JOB_COORD_RERUN_ACTION}.
070 * <p/>
071 * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
072 * <p/>
073 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
074 */
075@SuppressWarnings("deprecation")
076public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
077
078    private String rerunType;
079    private String scope;
080    private boolean refresh;
081    private boolean noCleanup;
082    private CoordinatorJobBean coordJob = null;
083    protected boolean prevPending;
084
085    /**
086     * The constructor for class {@link CoordRerunXCommand}
087     *
088     * @param jobId the job id
089     * @param rerunType rerun type {@link RestConstants.JOB_COORD_RERUN_DATE} or {@link RestConstants.JOB_COORD_RERUN_ACTION}
090     * @param scope the rerun scope for given rerunType separated by ","
091     * @param refresh true if user wants to refresh input/output dataset urls
092     * @param noCleanup false if user wants to cleanup output events for given rerun actions
093     */
094    public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
095        super("coord_rerun", "coord_rerun", 1);
096        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
097        this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
098        this.scope = ParamChecker.notEmpty(scope, "scope");
099        this.refresh = refresh;
100        this.noCleanup = noCleanup;
101    }
102
103    /**
104     * Check if all given actions are eligible to rerun.
105     *
106     * @param actions list of CoordinatorActionBean
107     * @return true if all actions are eligible to rerun
108     */
109    private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
110        ParamChecker.notNull(coordActions, "Coord actions to be rerun");
111        boolean ret = false;
112        for (CoordinatorActionBean coordAction : coordActions) {
113            ret = true;
114            if (!coordAction.isTerminalStatus()) {
115                ret = false;
116                break;
117            }
118        }
119        return ret;
120    }
121
122    /**
123     * Cleanup output-events directories
124     *
125     * @param eAction coordinator action xml
126     * @param user user name
127     * @param group group name
128     */
129    @SuppressWarnings("unchecked")
130    private void cleanupOutputEvents(Element eAction, String user, String group) {
131        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
132        if (outputList != null) {
133            for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
134                if (data.getChild("uris", data.getNamespace()) != null) {
135                    String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
136                    if (uris != null) {
137                        String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
138                        FsActionExecutor fsAe = new FsActionExecutor();
139                        for (String uri : uriArr) {
140                            Path path = new Path(uri);
141                            try {
142                                fsAe.delete(user, group, path);
143                                LOG.debug("Cleanup the output dir " + path);
144                            }
145                            catch (ActionExecutorException ae) {
146                                LOG.warn("Failed to cleanup the output dir " + uri, ae);
147                            }
148                        }
149                    }
150
151                }
152            }
153        }
154        else {
155            LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
156        }
157    }
158
159    /**
160     * Refresh an action's input and ouput events.
161     *
162     * @param coordJob coordinator job bean
163     * @param coordAction coordinator action bean
164     * @throws Exception thrown if failed to materialize coordinator action
165     */
166    private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
167        Configuration jobConf = null;
168        try {
169            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
170        }
171        catch (IOException ioe) {
172            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
173            throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
174        }
175        String jobXml = coordJob.getJobXml();
176        Element eJob = XmlUtils.parseXml(jobXml);
177        Date actualTime = new Date();
178        String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
179                .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
180        LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
181                + XmlUtils.prettyPrint(actionXml).toString());
182        coordAction.setActionXml(actionXml);
183    }
184
185    /**
186     * Update an action into database table
187     *
188     * @param coordJob coordinator job bean
189     * @param coordAction coordinator action bean
190     * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
191     */
192    private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction)
193            throws Exception {
194        LOG.debug("updateAction for actionId=" + coordAction.getId());
195        if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
196            LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
197            coordAction.setCreatedTime(new Date());
198        }
199        coordAction.setStatus(CoordinatorAction.Status.WAITING);
200        coordAction.setExternalId(null);
201        coordAction.setExternalStatus(null);
202        coordAction.setRerunTime(new Date());
203        coordAction.setLastModifiedTime(new Date());
204        coordAction.setErrorCode("");
205        coordAction.setErrorMessage("");
206        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, coordAction));
207        writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
208    }
209
210    /**
211     * Create SLA RegistrationEvent
212     *
213     * @param actionXml action xml
214     * @param actionBean coordinator action bean
215     * @param user user name
216     * @param group group name
217     * @throws Exception thrown if unable to write sla registration event
218     */
219    private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
220            throws Exception {
221        Element eAction = XmlUtils.parseXml(actionXml);
222        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
223        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
224                SlaAppType.COORDINATOR_ACTION, user, group, LOG);
225        if(slaEvent != null) {
226            insertList.add(slaEvent);
227        }
228    }
229
230    /* (non-Javadoc)
231     * @see org.apache.oozie.command.XCommand#getEntityKey()
232     */
233    @Override
234    public String getEntityKey() {
235        return jobId;
236    }
237
238    /* (non-Javadoc)
239     * @see org.apache.oozie.command.XCommand#isLockRequired()
240     */
241    @Override
242    protected boolean isLockRequired() {
243        return true;
244    }
245
246    /* (non-Javadoc)
247     * @see org.apache.oozie.command.XCommand#loadState()
248     */
249    @Override
250    protected void loadState() throws CommandException {
251        try {
252            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
253            prevPending = coordJob.isPending();
254        }
255        catch (JPAExecutorException je) {
256            throw new CommandException(je);
257        }
258        LogUtils.setLogInfo(coordJob, logInfo);
259    }
260
261    /* (non-Javadoc)
262     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
263     */
264    @Override
265    protected void verifyPrecondition() throws CommandException, PreconditionException {
266        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus());
267
268        // no actions have been created for PREP job
269        if (coordJob.getStatus() == CoordinatorJob.Status.PREP || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) {
270            LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
271            // Call the parent so the pending flag is reset and state transition
272            // of bundle can happen
273            if (coordJob.getBundleId() != null) {
274                bundleStatusUpdate.call();
275            }
276            if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
277                throw new CommandException(ErrorCode.E1018,
278                        "coordinator job is PREP so no actions are materialized to rerun!");
279            }
280            else {
281                throw new CommandException(ErrorCode.E1018,
282                        "coordinator job is IGNORED, please change it to RUNNING before rerunning actions");
283            }
284        }
285    }
286
287    @Override
288    protected void eagerLoadState() throws CommandException {
289        try {
290            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
291        }
292        catch (JPAExecutorException e) {
293            throw new CommandException(e);
294        }
295    }
296
297    @Override
298    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
299        verifyPrecondition();
300    }
301
302    @Override
303    public void rerunChildren() throws CommandException {
304        boolean isError = false;
305        try {
306            CoordinatorActionInfo coordInfo = null;
307            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
308            List<CoordinatorActionBean> coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false);
309            if (checkAllActionsRunnable(coordActions)) {
310                for (CoordinatorActionBean coordAction : coordActions) {
311                    String actionXml = coordAction.getActionXml();
312                    if (!noCleanup) {
313                        Element eAction = XmlUtils.parseXml(actionXml);
314                        cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup());
315                    }
316                    if (refresh) {
317                        refreshAction(coordJob, coordAction);
318                    }
319                    updateAction(coordJob, coordAction);
320                    if (SLAService.isEnabled()) {
321                        SLAOperations.updateRegistrationEvent(coordAction.getId());
322                    }
323                    queue(new CoordActionNotificationXCommand(coordAction), 100);
324                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
325                }
326            }
327            else {
328                isError = true;
329                throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
330            }
331            coordInfo = new CoordinatorActionInfo(coordActions);
332
333            ret = coordInfo;
334        }
335        catch (XException xex) {
336            isError = true;
337            throw new CommandException(xex);
338        }
339        catch (JDOMException jex) {
340            isError = true;
341            throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
342        }
343        catch (Exception ex) {
344            isError = true;
345            throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex);
346        }
347        finally{
348            if(isError){
349                transitToPrevious();
350            }
351        }
352    }
353
354    /*
355     * (non-Javadoc)
356     * @see org.apache.oozie.command.TransitionXCommand#getJob()
357     */
358    @Override
359    public Job getJob() {
360        return coordJob;
361    }
362
363    @Override
364    public void notifyParent() throws CommandException {
365        //update bundle action
366        if (getPrevStatus() != null && coordJob.getBundleId() != null) {
367            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
368            bundleStatusUpdate.call();
369        }
370    }
371
372    @Override
373    public void updateJob() {
374        if (getPrevStatus()!= null){
375            Job.Status coordJobStatus = getPrevStatus();
376            if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
377                coordJob.setStatus(coordJobStatus);
378            }
379            if (prevPending) {
380                coordJob.setPending();
381            } else {
382                coordJob.resetPending();
383            }
384        }
385        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob));
386    }
387
388    /* (non-Javadoc)
389     * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
390     */
391    @Override
392    public void performWrites() throws CommandException {
393        try {
394            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
395            if (EventHandlerService.isEnabled()) {
396                generateEvents(coordJob, null);
397            }
398        }
399        catch (JPAExecutorException e) {
400            throw new CommandException(e);
401        }
402    }
403
404    /* (non-Javadoc)
405     * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
406     */
407    @Override
408    public XLog getLog() {
409        return LOG;
410    }
411
412    @Override
413    public final void transitToNext() {
414        prevStatus = coordJob.getStatus();
415        if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED
416                || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) {
417            coordJob.setStatus(Job.Status.RUNNING);
418        }
419        else {
420            // Check for backward compatibility for Oozie versions (3.2 and before)
421            // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
422            // PAUSEDWITHERROR is not supported
423            coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
424        }
425        // used for backward support of coordinator 0.1 schema
426        coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
427        coordJob.setPending();
428    }
429
430    private final void transitToPrevious() throws CommandException {
431        coordJob.setStatus(getPrevStatus());
432        if (!prevPending) {
433            coordJob.resetPending();
434        }
435        else {
436            coordJob.setPending();
437        }
438    }
439}