001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.Calendar;
024import java.util.Date;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.security.AccessControlException;
028import org.apache.oozie.CoordinatorActionBean;
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.CoordinatorAction;
032import org.apache.oozie.client.Job;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.command.PreconditionException;
035import org.apache.oozie.coord.CoordELEvaluator;
036import org.apache.oozie.coord.CoordELFunctions;
037import org.apache.oozie.coord.ElException;
038import org.apache.oozie.coord.input.dependency.CoordInputDependency;
039import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
041import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
043import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.service.CallableQueueService;
046import org.apache.oozie.service.ConfigurationService;
047import org.apache.oozie.service.EventHandlerService;
048import org.apache.oozie.service.JPAService;
049import org.apache.oozie.service.Service;
050import org.apache.oozie.service.Services;
051import org.apache.oozie.util.DateUtils;
052import org.apache.oozie.util.ELEvaluator;
053import org.apache.oozie.util.LogUtils;
054import org.apache.oozie.util.ParamChecker;
055import org.apache.oozie.util.StatusUtils;
056import org.apache.oozie.util.XConfiguration;
057import org.apache.oozie.util.XLog;
058import org.apache.oozie.util.XmlUtils;
059import org.jdom.Element;
060
061/**
062 * The command to check if an action's data input paths exist in the file system.
063 */
064public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
065
066    public static final String COORD_EXECUTION_NONE_TOLERANCE = "oozie.coord.execution.none.tolerance";
067
068    private final String actionId;
069    /**
070     * Property name of command re-queue interval for coordinator action input check in
071     * milliseconds.
072     */
073    public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
074            + "coord.input.check.requeue.interval";
075    public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY = Service.CONF_PREFIX
076            + "coord.input.check.requeue.interval.additional.delay";
077    private CoordinatorActionBean coordAction = null;
078    private CoordinatorJobBean coordJob = null;
079    private JPAService jpaService = null;
080    private String jobId = null;
081    public CoordActionInputCheckXCommand(String actionId, String jobId) {
082        super("coord_action_input", "coord_action_input", 1);
083        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
084        this.jobId = jobId;
085    }
086
087    @Override
088    protected void setLogInfo() {
089        LogUtils.setLogInfo(actionId);
090    }
091
092    @Override
093    protected Void execute() throws CommandException {
094        LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
095
096        // this action should only get processed if current time > nominal time;
097        // otherwise, requeue this action for delay execution;
098        Date nominalTime = coordAction.getNominalTime();
099        Date currentTime = new Date();
100        if (nominalTime.compareTo(currentTime) > 0) {
101            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), nominalTime.getTime()
102                    - currentTime.getTime());
103            updateCoordAction(coordAction, false);
104            LOG.info("[" + actionId
105                    + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
106                    + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
107
108            return null;
109        }
110
111        StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
112        boolean isChangeInDependency = false;
113        try {
114            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
115            Date now = new Date();
116            if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
117                Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, coordAction);
118                if (nextNominalTime != null) {
119                    // If the current time is after the next action's nominal time, then we've passed the window where this action
120                    // should be started; so set it to SKIPPED
121                    if (now.after(nextNominalTime)) {
122                        LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
123                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
124                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
125                        queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
126                        return null;
127                    } else {
128                        LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
129                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
130                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
131                    }
132                }
133            }
134            else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
135                // If the current time is after the nominal time of this action plus some tolerance,
136                // then we've passed the window where this action should be started; so set it to SKIPPED
137                Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
138                cal.setTime(nominalTime);
139                int tolerance = ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE);
140                cal.add(Calendar.MINUTE, tolerance);
141                if (now.after(cal.getTime())) {
142                    LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]"
143                            + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
144                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(nominalTime));
145                    queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
146                    return null;
147                } else {
148                    LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]"
149                            + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
150                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
151                }
152            }
153
154            StringBuilder existList = new StringBuilder();
155            StringBuilder nonExistList = new StringBuilder();
156            CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
157            CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
158
159
160            String missingDependencies = coordPullInputDependency.getMissingDependencies();
161            StringBuilder nonResolvedList = new StringBuilder();
162
163            CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
164            String firstMissingDependency = "";
165            // For clarity regarding which is the missing dependency in synchronous order
166            // instead of printing entire list, some of which, may be available
167            if (nonExistList.length() > 0) {
168                firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
169            }
170            LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
171                    + nonResolvedList.toString());
172
173
174            boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf);
175            boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet();
176            if (status && nonResolvedList.length() > 0) {
177                status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false;
178            }
179            coordAction.setLastModifiedTime(currentTime);
180            coordAction.setActionXml(actionXml.toString());
181
182            isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status);
183
184            if (status && isPushDependenciesMet) {
185                moveCoordActionToReady(actionXml, actionConf, coordPullInputDependency, coordPushInputDependency);
186            }
187            else if (!isTimeout(currentTime)) {
188                if (!status) {
189                    long addtionalDelay = isChangeInDependency ? 0
190                            : ConfigurationService.getInt(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY)
191                                    * 1000L;
192                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
193                            addtionalDelay + getCoordInputCheckRequeueInterval());
194                }
195                updateCoordAction(coordAction, isChangeInDependency);
196            }
197            else {
198                if (isPushDependenciesMet) {
199                    queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
200                }
201                else {
202                    // Let CoordPushDependencyCheckXCommand queue the timeout
203                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
204                }
205                updateCoordAction(coordAction, isChangeInDependency);
206            }
207        }
208        catch (AccessControlException e) {
209            LOG.error("Permission error in ActionInputCheck", e);
210            if (isTimeout(currentTime)) {
211                LOG.debug("Queueing timeout command");
212                Services.get().get(CallableQueueService.class)
213                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
214            }
215            else {
216                // Requeue InputCheckCommand for permission denied error with longer interval
217                Services.get()
218                        .get(CallableQueueService.class)
219                        .queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
220                                2 * getCoordInputCheckRequeueInterval());
221            }
222            updateCoordAction(coordAction, isChangeInDependency);
223        }
224        catch (Exception e) {
225            if (isTimeout(currentTime)) {
226                LOG.debug("Queueing timeout command");
227                // XCommand.queue() will not work when there is a Exception
228                Services.get().get(CallableQueueService.class)
229                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
230            }
231            updateCoordAction(coordAction, isChangeInDependency);
232            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
233        }
234        return null;
235    }
236
237    private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
238            StringBuilder nonResolvedList, boolean status) throws IOException {
239        if (nonResolvedList.length() > 0 && status == false) {
240            nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
241        }
242        return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies,
243                nonResolvedList, status);
244    }
245
246    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId)
247            throws Exception {
248        return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null);
249    }
250
251    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId,
252            CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
253        Element eAction = XmlUtils.parseXml(actionXml.toString());
254        ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies,
255                pushDependencies);
256        materializeDataProperties(eAction, actionConf, eval);
257        return XmlUtils.prettyPrint(eAction).toString();
258    }
259
260    private boolean isTimeout(Date currentTime) {
261        long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
262                .getCreatedTime().getTime()))
263                / (60 * 1000);
264        int timeOut = coordAction.getTimeOut();
265        return (timeOut >= 0) && (waitingTime > timeOut);
266    }
267
268    private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
269            throws CommandException {
270        coordAction.setLastModifiedTime(new Date());
271        if (jpaService != null) {
272            try {
273                if (isChangeInDependency) {
274                    coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize());
275                    CoordActionQueryExecutor.getInstance().executeUpdate(
276                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction);
277                    if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
278                        // since event is not to be generated unless action
279                        // RUNNING via StartX
280                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
281                    }
282                }
283                else {
284                    CoordActionQueryExecutor.getInstance().executeUpdate(
285                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
286                }
287            }
288            catch (Exception jex) {
289                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
290            }
291        }
292    }
293    /**
294     * This function reads the value of re-queue interval for coordinator input
295     * check command from the Oozie configuration provided by Configuration
296     * Service. If nothing defined in the configuration, it uses the code
297     * specified default value.
298     *
299     * @return re-queue interval in ms
300     */
301    public long getCoordInputCheckRequeueInterval() {
302        long requeueInterval = ConfigurationService.getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL);
303        return requeueInterval;
304    }
305
306    /**
307     * To check the list of input paths if all of them exist
308     *
309     * @param actionXml action xml
310     * @param existList the list of existed paths
311     * @param nonExistList the list of non existed paths
312     * @param conf action configuration
313     * @return true if all input paths are existed
314     * @throws Exception thrown of unable to check input path
315     */
316    protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
317            Configuration conf) throws Exception {
318        return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList,
319                nonExistList);
320    }
321
322    /**
323     * Check un resolved input.
324     *
325     * @param coordAction the coord action
326     * @param actionXml the action xml
327     * @param conf the conf
328     * @return true, if successful
329     * @throws Exception the exception
330     */
331    protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml,
332            Configuration conf) throws Exception {
333        Element eAction = XmlUtils.parseXml(actionXml.toString());
334        LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
335        boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf);
336        if (allExist) {
337            actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
338        }
339        return allExist;
340    }
341
342    /**
343     * Check un resolved input.
344     *
345     * @param actionXml the action xml
346     * @param conf the conf
347     * @return true, if successful
348     * @throws Exception the exception
349     */
350    protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
351        return checkUnResolvedInput(coordAction, actionXml, conf);
352    }
353
354    /**
355     * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
356     * of files that will be needed.
357     *
358     * @param eAction action element
359     * @param conf action configuration
360     * @throws Exception thrown if failed to resolve data properties
361     * @update modify 'Action' element with appropriate list of files.
362     */
363    @SuppressWarnings("unchecked")
364    static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception {
365        Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
366                eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
367        if (configElem != null) {
368            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
369                resolveTagContents("value", propElem, eval);
370            }
371        }
372    }
373
374    /**
375     * To resolve property value which contains el functions
376     *
377     * @param tagName tag name
378     * @param elem the child element of "property" element
379     * @param eval el functions evaluator
380     * @throws Exception thrown if unable to resolve tag value
381     */
382    private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
383        if (elem == null) {
384            return;
385        }
386        Element tagElem = elem.getChild(tagName, elem.getNamespace());
387        if (tagElem != null) {
388            String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
389            tagElem.removeContent();
390            tagElem.addContent(updated);
391        }
392        else {
393            XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName);
394        }
395    }
396
397    /**
398     * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
399     *
400     * @param eAction action element
401     * @param actionConf action configuration
402     * @return true if successful to resolve input and output paths
403     * @throws Exception thrown if failed to resolve data input and output paths
404     */
405    @SuppressWarnings("unchecked")
406    private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction,
407            Configuration actionConf) throws Exception {
408
409        boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction);
410
411        // Using latest() or future() in output-event is not intuitive.
412        // We need to make sure, this assumption is correct.
413        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
414        if (outputList != null) {
415            for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
416                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) {
417                    throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
418                            " not permitted in output-event ");
419                }
420            }
421        }
422        return ret;
423    }
424
425    /**
426     * Resolves coordinator configuration and moves CoordAction to READY state
427     *
428     * @param actionXml
429     * @param actionConf
430     * @param coordPullInputDependency
431     * @param coordPushInputDependency
432     * @throws Exception
433     */
434    private void moveCoordActionToReady(StringBuilder actionXml, Configuration actionConf,
435            CoordInputDependency coordPullInputDependency, CoordInputDependency coordPushInputDependency)
436            throws Exception {
437        String newActionXml = null;
438        try {
439            newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId, coordPullInputDependency,
440                    coordPushInputDependency);
441        }
442        catch (ElException e) {
443            coordAction.setStatus(CoordinatorAction.Status.FAILED);
444            updateCoordAction(coordAction, true);
445            throw e;
446        }
447        actionXml.replace(0, actionXml.length(), newActionXml);
448        coordAction.setActionXml(actionXml.toString());
449        coordAction.setStatus(CoordinatorAction.Status.READY);
450        updateCoordAction(coordAction, true);
451        new CoordActionReadyXCommand(coordAction.getJobId()).call();
452    }
453
454    /**
455     * getting the error code of the coord action. (used mainly for unit testing)
456     */
457    protected String getCoordActionErrorCode() {
458        if (coordAction != null) {
459            return coordAction.getErrorCode();
460        }
461        return null;
462    }
463
464    /**
465     * getting the error message of the coord action. (used mainly for unit testing)
466     */
467    protected String getCoordActionErrorMsg() {
468        if (coordAction != null) {
469            return coordAction.getErrorMessage();
470        }
471        return null;
472    }
473
474    @Override
475    public String getEntityKey() {
476        return this.jobId;
477    }
478
479    @Override
480    protected boolean isLockRequired() {
481        return true;
482    }
483
484    @Override
485    protected void loadState() throws CommandException {
486        if (jpaService == null) {
487            jpaService = Services.get().get(JPAService.class);
488        }
489        try {
490            coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
491            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK,
492                    coordAction.getJobId());
493        }
494        catch (JPAExecutorException je) {
495            throw new CommandException(je);
496        }
497        LogUtils.setLogInfo(coordAction);
498    }
499
500    @Override
501    protected void verifyPrecondition() throws CommandException, PreconditionException {
502        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
503            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
504                    + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
505                    + coordAction.getStatus());
506        }
507
508        // if eligible to do action input check when running with backward support is true
509        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
510            return;
511        }
512
513        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
514                && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
515            throw new PreconditionException(
516                    ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
517                                " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
518                            + coordJob.getStatus());
519        }
520    }
521
522    @Override
523    public String getKey(){
524        return getName() + "_" + actionId;
525    }
526
527}