001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.Date;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.oozie.CoordinatorActionBean;
034import org.apache.oozie.CoordinatorActionInfo;
035import org.apache.oozie.CoordinatorJobBean;
036import org.apache.oozie.ErrorCode;
037import org.apache.oozie.SLAEventBean;
038import org.apache.oozie.XException;
039import org.apache.oozie.client.CoordinatorAction;
040import org.apache.oozie.client.CoordinatorJob;
041import org.apache.oozie.client.Job;
042import org.apache.oozie.client.SLAEvent.SlaAppType;
043import org.apache.oozie.client.rest.RestConstants;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.command.PreconditionException;
046import org.apache.oozie.command.RerunTransitionXCommand;
047import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
048import org.apache.oozie.coord.CoordELFunctions;
049import org.apache.oozie.coord.CoordUtils;
050import org.apache.oozie.dependency.URIHandler;
051import org.apache.oozie.dependency.URIHandler.Context;
052import org.apache.oozie.dependency.URIHandlerException;
053import org.apache.oozie.executor.jpa.BatchQueryExecutor;
054import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
055import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
056import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
057import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
058import org.apache.oozie.executor.jpa.JPAExecutorException;
059import org.apache.oozie.service.EventHandlerService;
060import org.apache.oozie.service.Services;
061import org.apache.oozie.service.URIHandlerService;
062import org.apache.oozie.sla.SLAOperations;
063import org.apache.oozie.sla.service.SLAService;
064import org.apache.oozie.util.InstrumentUtils;
065import org.apache.oozie.util.LogUtils;
066import org.apache.oozie.util.ParamChecker;
067import org.apache.oozie.util.StatusUtils;
068import org.apache.oozie.util.XConfiguration;
069import org.apache.oozie.util.XLog;
070import org.apache.oozie.util.XmlUtils;
071import org.apache.oozie.util.db.SLADbOperations;
072import org.jdom.Element;
073import org.jdom.JDOMException;
074
075/**
076 * Rerun coordinator actions by a list of dates or ids. User can specify if refresh or noCleanup.
077 * <p>
078 * The "rerunType" can be set as {@link RestConstants#JOB_COORD_SCOPE_DATE} or {@link RestConstants#JOB_COORD_SCOPE_ACTION}.
079 * <p>
080 * The "refresh" is used to indicate if user wants to refresh an action's input and output events.
081 * <p>
082 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions
083 */
084@SuppressWarnings("deprecation")
085public class CoordRerunXCommand extends RerunTransitionXCommand<CoordinatorActionInfo> {
086
087    public static final String RERUN_CONF = "rerunConf";
088    private String rerunType;
089    private String scope;
090    private boolean refresh;
091    private boolean noCleanup;
092    private CoordinatorJobBean coordJob = null;
093    protected boolean prevPending;
094    private boolean failed;
095    private Configuration actionRunConf;
096
097    /**
098     * The constructor for class {@link CoordRerunXCommand}
099     *
100     * @param jobId the job id
101     * @param rerunType rerun type {@link RestConstants#JOB_COORD_SCOPE_DATE} or {@link RestConstants#JOB_COORD_SCOPE_ACTION}
102     * @param scope the rerun scope for given rerunType separated by ","
103     * @param refresh true if user wants to refresh input/output dataset urls
104     * @param noCleanup false if user wants to cleanup output events for given rerun actions
105     * @param failed true if user wants to rerun only failed nodes
106     * @param actionRunConf configuration values for actions
107     */
108    public CoordRerunXCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup,
109                              boolean failed, Configuration actionRunConf) {
110        super("coord_rerun", "coord_rerun", 1);
111        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
112        this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType");
113        this.scope = ParamChecker.notEmpty(scope, "scope");
114        this.refresh = refresh;
115        this.noCleanup = noCleanup;
116        this.failed = failed;
117        this.actionRunConf = actionRunConf;
118    }
119
120    /**
121     * Check if all given actions are eligible to rerun.
122     *
123     * @param coordActions list of CoordinatorActionBean
124     * @return true if all actions are eligible to rerun
125     */
126    private static boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) {
127        ParamChecker.notNull(coordActions, "Coord actions to be rerun");
128        boolean ret = false;
129        for (CoordinatorActionBean coordAction : coordActions) {
130            ret = true;
131            if (!coordAction.isTerminalStatus()) {
132                ret = false;
133                break;
134            }
135        }
136        return ret;
137    }
138
139    /**
140     * Cleanup output-events directories
141     *
142     * @param eAction coordinator action xml
143     */
144    @SuppressWarnings("unchecked")
145    private void cleanupOutputEvents(Element eAction, Configuration coordJobConf, Map<String, Context> uriHandlerContextMap)
146            throws CommandException {
147        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
148        if (outputList != null) {
149
150            for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
151                String nocleanup = data.getAttributeValue("nocleanup");
152                if (data.getChild("uris", data.getNamespace()) != null
153                        && (nocleanup == null || !nocleanup.equals("true"))) {
154                    String uris = data.getChild("uris", data.getNamespace()).getTextTrim();
155                    if (uris != null) {
156                        String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR);
157                        try {
158                            for (String uriStr : uriArr) {
159                                URI uri = new URI(uriStr);
160                                URIHandler handler = Services.get().get(URIHandlerService.class).getURIHandler(uri);
161                                String schemeWithAuthority = uri.getScheme() + "://" + uri.getAuthority();
162                                if (!uriHandlerContextMap.containsKey(schemeWithAuthority)) {
163                                    Context context = handler.getContext(uri, coordJobConf, coordJob.getUser(), false);
164                                    uriHandlerContextMap.put(schemeWithAuthority, context);
165                                }
166                                handler.delete(uri, uriHandlerContextMap.get(schemeWithAuthority));
167                                LOG.info("Cleanup the output data " + uri.toString());
168                            }
169                        }
170                        catch (URISyntaxException e) {
171                            throw new CommandException(ErrorCode.E0907, e.getMessage());
172                        }
173                        catch (URIHandlerException e) {
174                            throw new CommandException(ErrorCode.E0907, e.getMessage());
175                        }
176                    }
177                }
178            }
179
180        }
181        else {
182            LOG.info("No output-events defined in coordinator xml. Therefore nothing to cleanup");
183        }
184    }
185
186    /**
187     * Refresh an action's input and ouput events.
188     *
189     * @param coordJob coordinator job bean
190     * @param coordAction coordinator action bean
191     * @throws Exception thrown if failed to materialize coordinator action
192     */
193    private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction) throws Exception {
194        Configuration jobConf = null;
195        try {
196            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
197        }
198        catch (IOException ioe) {
199            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe);
200            throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
201        }
202        String jobXml = coordJob.getJobXml();
203        Element eJob = XmlUtils.parseXml(jobXml);
204        Date actualTime = new Date();
205        String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction
206                .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction);
207        LOG.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml="
208                + XmlUtils.prettyPrint(actionXml).toString());
209        coordAction.setActionXml(actionXml);
210    }
211
212    /**
213     * Update an action into database table
214     *
215     * @param coordJob coordinator job bean
216     * @param coordAction coordinator action bean
217     * @throws Exception thrown failed to update coordinator action bean or unable to write sla registration event
218     */
219    private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction)
220            throws Exception {
221        LOG.debug("updateAction for actionId=" + coordAction.getId());
222        if (coordAction.getStatus() == CoordinatorAction.Status.TIMEDOUT) {
223            LOG.debug("Updating created time for TIMEDOUT action id =" + coordAction.getId());
224            coordAction.setCreatedTime(new Date());
225        }
226        coordAction.setStatus(CoordinatorAction.Status.WAITING);
227        if(!failed) {
228            coordAction.setExternalId(null);
229        }
230        coordAction.setExternalStatus(null);
231        coordAction.setRerunTime(new Date());
232        coordAction.setLastModifiedTime(new Date());
233        coordAction.setErrorCode("");
234        coordAction.setErrorMessage("");
235
236        // Pushing the configuration which passed through rerun.
237        if(actionRunConf != null && actionRunConf.size() > 0) {
238            Configuration createdConf = null;
239            if(coordAction.getCreatedConf() != null ) {
240                createdConf = new XConfiguration(new StringReader(coordAction.getCreatedConf()));
241            } else {
242                createdConf = new Configuration();
243            }
244            createdConf.set(RERUN_CONF, XmlUtils.prettyPrint(actionRunConf).toString());
245            coordAction.setCreatedConf(XmlUtils.prettyPrint(createdConf).toString());
246        }
247        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, coordAction));
248        writeActionRegistration(coordAction.getActionXml(), coordAction, coordJob.getUser(), coordJob.getGroup());
249    }
250
251    /**
252     * Create SLA RegistrationEvent
253     *
254     * @param actionXml action xml
255     * @param actionBean coordinator action bean
256     * @param user user name
257     * @param group group name
258     * @throws Exception thrown if unable to write sla registration event
259     */
260    private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, String user, String group)
261            throws Exception {
262        Element eAction = XmlUtils.parseXml(actionXml);
263        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
264        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(),
265                SlaAppType.COORDINATOR_ACTION, user, group, LOG);
266        if(slaEvent != null) {
267            insertList.add(slaEvent);
268        }
269    }
270
271    /* (non-Javadoc)
272     * @see org.apache.oozie.command.XCommand#getEntityKey()
273     */
274    @Override
275    public String getEntityKey() {
276        return jobId;
277    }
278
279    /* (non-Javadoc)
280     * @see org.apache.oozie.command.XCommand#isLockRequired()
281     */
282    @Override
283    protected boolean isLockRequired() {
284        return true;
285    }
286
287    /* (non-Javadoc)
288     * @see org.apache.oozie.command.XCommand#loadState()
289     */
290    @Override
291    protected void loadState() throws CommandException {
292        try {
293            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
294            prevPending = coordJob.isPending();
295        }
296        catch (JPAExecutorException je) {
297            throw new CommandException(je);
298        }
299        LogUtils.setLogInfo(coordJob);
300    }
301
302    /* (non-Javadoc)
303     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
304     */
305    @Override
306    protected void verifyPrecondition() throws CommandException, PreconditionException {
307        BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, coordJob.getStatus());
308
309        // no actions have been created for PREP job
310        if (coordJob.getStatus() == CoordinatorJob.Status.PREP || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) {
311            LOG.info("CoordRerunXCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" + jobId);
312            // Call the parent so the pending flag is reset and state transition
313            // of bundle can happen
314            if (coordJob.getBundleId() != null) {
315                bundleStatusUpdate.call();
316            }
317            if (coordJob.getStatus() == CoordinatorJob.Status.PREP) {
318                throw new CommandException(ErrorCode.E1018,
319                        "coordinator job is PREP so no actions are materialized to rerun!");
320            }
321            else {
322                throw new CommandException(ErrorCode.E1018,
323                        "coordinator job is IGNORED, please change it to RUNNING before rerunning actions");
324            }
325        }
326    }
327
328    @Override
329    protected void eagerLoadState() throws CommandException {
330        try {
331            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
332        }
333        catch (JPAExecutorException e) {
334            throw new CommandException(e);
335        }
336    }
337
338    @Override
339    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
340        verifyPrecondition();
341    }
342
343    @Override
344    public void rerunChildren() throws CommandException {
345        boolean isError = false;
346        try {
347            CoordinatorActionInfo coordInfo = null;
348            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
349            List<CoordinatorActionBean> coordActions = CoordUtils.getCoordActions(rerunType, jobId, scope, false);
350            if (checkAllActionsRunnable(coordActions)) {
351                Map<String, Context> uriHandlerContextMap = new HashMap<String, Context>();
352                Configuration coordJobConf = null;
353                try {
354                    coordJobConf = new XConfiguration(new StringReader(coordJob.getConf()));
355                }
356                catch (IOException e) {
357                    throw new CommandException(ErrorCode.E0907, "failed to read coord job conf to clean up output data");
358                }
359                try {
360                    for (CoordinatorActionBean coordAction : coordActions) {
361                        String actionXml = coordAction.getActionXml();
362                        // Cleanup activity should not run when failed option has been provided
363                        if (!noCleanup && !failed) {
364                            Element eAction = XmlUtils.parseXml(actionXml);
365                            cleanupOutputEvents(eAction, coordJobConf, uriHandlerContextMap);
366                        }
367                        if (refresh) {
368                            refreshAction(coordJob, coordAction);
369                        }
370                        updateAction(coordJob, coordAction);
371                        if (SLAService.isEnabled()) {
372                            SLAOperations.updateRegistrationEvent(coordAction.getId());
373                        }
374                        queue(new CoordActionNotificationXCommand(coordAction), 100);
375                        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 100);
376                        if (coordAction.getPushMissingDependencies() != null) {
377                            queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
378                        }
379                    }
380                }
381                finally {
382                    Iterator<Entry<String, Context>> itr = uriHandlerContextMap.entrySet().iterator();
383                    while (itr.hasNext()) {
384                        Entry<String, Context> entry = itr.next();
385                        entry.getValue().destroy();
386                        itr.remove();
387                    }
388                }
389            }
390            else {
391                isError = true;
392                throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!");
393            }
394            coordInfo = new CoordinatorActionInfo(coordActions);
395
396            ret = coordInfo;
397        }
398        catch (XException xex) {
399            isError = true;
400            throw new CommandException(xex);
401        }
402        catch (JDOMException jex) {
403            isError = true;
404            throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
405        }
406        catch (Exception ex) {
407            isError = true;
408            throw new CommandException(ErrorCode.E1018, ex.getMessage(), ex);
409        }
410        finally{
411            if(isError){
412                transitToPrevious();
413            }
414        }
415    }
416
417    /*
418     * (non-Javadoc)
419     * @see org.apache.oozie.command.TransitionXCommand#getJob()
420     */
421    @Override
422    public Job getJob() {
423        return coordJob;
424    }
425
426    @Override
427    public void notifyParent() throws CommandException {
428        //update bundle action
429        if (getPrevStatus() != null && coordJob.getBundleId() != null) {
430            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, getPrevStatus());
431            bundleStatusUpdate.call();
432        }
433    }
434
435    @Override
436    public void updateJob() {
437        if (getPrevStatus()!= null){
438            Job.Status coordJobStatus = getPrevStatus();
439            if(coordJobStatus.equals(Job.Status.PAUSED) || coordJobStatus.equals(Job.Status.PAUSEDWITHERROR)) {
440                coordJob.setStatus(coordJobStatus);
441            }
442            if (prevPending) {
443                coordJob.setPending();
444            } else {
445                coordJob.resetPending();
446            }
447        }
448        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob));
449    }
450
451    /* (non-Javadoc)
452     * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites()
453     */
454    @Override
455    public void performWrites() throws CommandException {
456        try {
457            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
458            if (EventHandlerService.isEnabled()) {
459                generateEvents(coordJob, null);
460            }
461        }
462        catch (JPAExecutorException e) {
463            throw new CommandException(e);
464        }
465    }
466
467    /* (non-Javadoc)
468     * @see org.apache.oozie.command.RerunTransitionXCommand#getLog()
469     */
470    @Override
471    public XLog getLog() {
472        return LOG;
473    }
474
475    @Override
476    public final void transitToNext() {
477        prevStatus = coordJob.getStatus();
478        if (prevStatus == CoordinatorJob.Status.SUCCEEDED || prevStatus == CoordinatorJob.Status.PAUSED
479                || prevStatus == CoordinatorJob.Status.SUSPENDED || prevStatus == CoordinatorJob.Status.RUNNING) {
480            coordJob.setStatus(Job.Status.RUNNING);
481        }
482        else {
483            // Check for backward compatibility for Oozie versions (3.2 and before)
484            // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
485            // PAUSEDWITHERROR is not supported
486            coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
487        }
488        // used for backward support of coordinator 0.1 schema
489        coordJob.setStatus(StatusUtils.getStatusForCoordRerun(coordJob, prevStatus));
490        coordJob.setPending();
491    }
492
493    private final void transitToPrevious() throws CommandException {
494        coordJob.setStatus(getPrevStatus());
495        if (!prevPending) {
496            coordJob.resetPending();
497        }
498        else {
499            coordJob.setPending();
500        }
501    }
502}