001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.Calendar;
024import java.util.Date;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.security.AccessControlException;
028import org.apache.oozie.CoordinatorActionBean;
029import org.apache.oozie.CoordinatorJobBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.CoordinatorAction;
032import org.apache.oozie.client.Job;
033import org.apache.oozie.command.CommandException;
034import org.apache.oozie.command.PreconditionException;
035import org.apache.oozie.coord.CoordELEvaluator;
036import org.apache.oozie.coord.CoordELFunctions;
037import org.apache.oozie.coord.input.dependency.CoordInputDependency;
038import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
039import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
040import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
043import org.apache.oozie.executor.jpa.JPAExecutorException;
044import org.apache.oozie.service.CallableQueueService;
045import org.apache.oozie.service.ConfigurationService;
046import org.apache.oozie.service.EventHandlerService;
047import org.apache.oozie.service.JPAService;
048import org.apache.oozie.service.Service;
049import org.apache.oozie.service.Services;
050import org.apache.oozie.util.DateUtils;
051import org.apache.oozie.util.ELEvaluator;
052import org.apache.oozie.util.LogUtils;
053import org.apache.oozie.util.ParamChecker;
054import org.apache.oozie.util.StatusUtils;
055import org.apache.oozie.util.XConfiguration;
056import org.apache.oozie.util.XLog;
057import org.apache.oozie.util.XmlUtils;
058import org.jdom.Element;
059
060/**
061 * The command to check if an action's data input paths exist in the file system.
062 */
063public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
064
065    public static final String COORD_EXECUTION_NONE_TOLERANCE = "oozie.coord.execution.none.tolerance";
066
067    private final String actionId;
068    /**
069     * Property name of command re-queue interval for coordinator action input check in
070     * milliseconds.
071     */
072    public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
073            + "coord.input.check.requeue.interval";
074    public static final String CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY = Service.CONF_PREFIX
075            + "coord.input.check.requeue.interval.additional.delay";
076    private CoordinatorActionBean coordAction = null;
077    private CoordinatorJobBean coordJob = null;
078    private JPAService jpaService = null;
079    private String jobId = null;
080    public CoordActionInputCheckXCommand(String actionId, String jobId) {
081        super("coord_action_input", "coord_action_input", 1);
082        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
083        this.jobId = jobId;
084    }
085
086    @Override
087    protected void setLogInfo() {
088        LogUtils.setLogInfo(actionId);
089    }
090
091    @Override
092    protected Void execute() throws CommandException {
093        LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
094
095        // this action should only get processed if current time > nominal time;
096        // otherwise, requeue this action for delay execution;
097        Date nominalTime = coordAction.getNominalTime();
098        Date currentTime = new Date();
099        if (nominalTime.compareTo(currentTime) > 0) {
100            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), nominalTime.getTime()
101                    - currentTime.getTime());
102            updateCoordAction(coordAction, false);
103            LOG.info("[" + actionId
104                    + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
105                    + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
106
107            return null;
108        }
109
110        StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
111        boolean isChangeInDependency = false;
112        try {
113            Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
114            Date now = new Date();
115            if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
116                Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, coordAction);
117                if (nextNominalTime != null) {
118                    // If the current time is after the next action's nominal time, then we've passed the window where this action
119                    // should be started; so set it to SKIPPED
120                    if (now.after(nextNominalTime)) {
121                        LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
122                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
123                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
124                        queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
125                        return null;
126                    } else {
127                        LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
128                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
129                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
130                    }
131                }
132            }
133            else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
134                // If the current time is after the nominal time of this action plus some tolerance,
135                // then we've passed the window where this action should be started; so set it to SKIPPED
136                Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
137                cal.setTime(nominalTime);
138                int tolerance = ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE);
139                cal.add(Calendar.MINUTE, tolerance);
140                if (now.after(cal.getTime())) {
141                    LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]"
142                            + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
143                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(nominalTime));
144                    queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
145                    return null;
146                } else {
147                    LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]"
148                            + " minutes later than the nominal time [{3}] of the current action]", coordAction.getId(),
149                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
150                }
151            }
152
153            StringBuilder existList = new StringBuilder();
154            StringBuilder nonExistList = new StringBuilder();
155            CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
156            CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
157
158
159            String missingDependencies = coordPullInputDependency.getMissingDependencies();
160            StringBuilder nonResolvedList = new StringBuilder();
161
162            CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
163            String firstMissingDependency = "";
164            // For clarity regarding which is the missing dependency in synchronous order
165            // instead of printing entire list, some of which, may be available
166            if (nonExistList.length() > 0) {
167                firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
168            }
169            LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
170                    + nonResolvedList.toString());
171
172
173            boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf);
174            String nonExistListStr = nonExistList.toString();
175            boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet();
176            if (status && nonResolvedList.length() > 0) {
177                status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false;
178            }
179            coordAction.setLastModifiedTime(currentTime);
180            coordAction.setActionXml(actionXml.toString());
181
182            isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status);
183
184            if (status && isPushDependenciesMet) {
185                String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId,
186                        coordPullInputDependency, coordPushInputDependency);
187                actionXml.replace(0, actionXml.length(), newActionXml);
188                coordAction.setActionXml(actionXml.toString());
189                coordAction.setStatus(CoordinatorAction.Status.READY);
190                updateCoordAction(coordAction, true);
191                new CoordActionReadyXCommand(coordAction.getJobId()).call();
192            }
193            else if (!isTimeout(currentTime)) {
194                if (!status) {
195                    long addtionalDelay = isChangeInDependency ? 0
196                            : ConfigurationService.getInt(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL_ADDITIONAL_DELAY)
197                                    * 1000L;
198                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
199                            addtionalDelay + getCoordInputCheckRequeueInterval());
200                }
201                updateCoordAction(coordAction, isChangeInDependency);
202            }
203            else {
204                if (!nonExistListStr.isEmpty() && isPushDependenciesMet) {
205                    queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
206                }
207                else {
208                    // Let CoordPushDependencyCheckXCommand queue the timeout
209                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
210                }
211                updateCoordAction(coordAction, isChangeInDependency);
212            }
213        }
214        catch (AccessControlException e) {
215            LOG.error("Permission error in ActionInputCheck", e);
216            if (isTimeout(currentTime)) {
217                LOG.debug("Queueing timeout command");
218                Services.get().get(CallableQueueService.class)
219                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
220            }
221            else {
222                // Requeue InputCheckCommand for permission denied error with longer interval
223                Services.get()
224                        .get(CallableQueueService.class)
225                        .queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
226                                2 * getCoordInputCheckRequeueInterval());
227            }
228            updateCoordAction(coordAction, isChangeInDependency);
229        }
230        catch (Exception e) {
231            if (isTimeout(currentTime)) {
232                LOG.debug("Queueing timeout command");
233                // XCommand.queue() will not work when there is a Exception
234                Services.get().get(CallableQueueService.class)
235                        .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
236            }
237            updateCoordAction(coordAction, isChangeInDependency);
238            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
239        }
240        return null;
241    }
242
243    private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
244            StringBuilder nonResolvedList, boolean status) throws IOException {
245        if (nonResolvedList.length() > 0 && status == false) {
246            nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
247        }
248        return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies,
249                nonResolvedList, status);
250    }
251
252    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId)
253            throws Exception {
254        return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null);
255    }
256
257    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId,
258            CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
259        Element eAction = XmlUtils.parseXml(actionXml.toString());
260        ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies,
261                pushDependencies);
262        materializeDataProperties(eAction, actionConf, eval);
263        return XmlUtils.prettyPrint(eAction).toString();
264    }
265
266    private boolean isTimeout(Date currentTime) {
267        long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
268                .getCreatedTime().getTime()))
269                / (60 * 1000);
270        int timeOut = coordAction.getTimeOut();
271        return (timeOut >= 0) && (waitingTime > timeOut);
272    }
273
274    private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
275            throws CommandException {
276        coordAction.setLastModifiedTime(new Date());
277        if (jpaService != null) {
278            try {
279                if (isChangeInDependency) {
280                    coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize());
281                    CoordActionQueryExecutor.getInstance().executeUpdate(
282                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction);
283                    if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
284                        // since event is not to be generated unless action
285                        // RUNNING via StartX
286                        generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
287                    }
288                }
289                else {
290                    CoordActionQueryExecutor.getInstance().executeUpdate(
291                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
292                }
293            }
294            catch (Exception jex) {
295                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
296            }
297        }
298    }
299    /**
300     * This function reads the value of re-queue interval for coordinator input
301     * check command from the Oozie configuration provided by Configuration
302     * Service. If nothing defined in the configuration, it uses the code
303     * specified default value.
304     *
305     * @return re-queue interval in ms
306     */
307    public long getCoordInputCheckRequeueInterval() {
308        long requeueInterval = ConfigurationService.getLong(CONF_COORD_INPUT_CHECK_REQUEUE_INTERVAL);
309        return requeueInterval;
310    }
311
312    /**
313     * To check the list of input paths if all of them exist
314     *
315     * @param actionXml action xml
316     * @param existList the list of existed paths
317     * @param nonExistList the list of non existed paths
318     * @param conf action configuration
319     * @return true if all input paths are existed
320     * @throws Exception thrown of unable to check input path
321     */
322    protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
323            Configuration conf) throws Exception {
324        return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList,
325                nonExistList);
326    }
327
328    /**
329     * Check un resolved input.
330     *
331     * @param coordAction the coord action
332     * @param actionXml the action xml
333     * @param conf the conf
334     * @return true, if successful
335     * @throws Exception the exception
336     */
337    protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml,
338            Configuration conf) throws Exception {
339        Element eAction = XmlUtils.parseXml(actionXml.toString());
340        LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
341        boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf);
342        if (allExist) {
343            actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
344        }
345        return allExist;
346    }
347
348    /**
349     * Check un resolved input.
350     *
351     * @param actionXml the action xml
352     * @param conf the conf
353     * @return true, if successful
354     * @throws Exception the exception
355     */
356    protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
357        return checkUnResolvedInput(coordAction, actionXml, conf);
358    }
359
360    /**
361     * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
362     * of files that will be needed.
363     *
364     * @param eAction action element
365     * @param conf action configuration
366     * @throws Exception thrown if failed to resolve data properties
367     * @update modify 'Action' element with appropriate list of files.
368     */
369    @SuppressWarnings("unchecked")
370    static void materializeDataProperties(Element eAction, Configuration conf, ELEvaluator eval) throws Exception {
371        Element configElem = eAction.getChild("action", eAction.getNamespace()).getChild("workflow",
372                eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
373        if (configElem != null) {
374            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
375                resolveTagContents("value", propElem, eval);
376            }
377        }
378    }
379
380    /**
381     * To resolve property value which contains el functions
382     *
383     * @param tagName tag name
384     * @param elem the child element of "property" element
385     * @param eval el functions evaluator
386     * @throws Exception thrown if unable to resolve tag value
387     */
388    private static void resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws Exception {
389        if (elem == null) {
390            return;
391        }
392        Element tagElem = elem.getChild(tagName, elem.getNamespace());
393        if (tagElem != null) {
394            String updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText());
395            tagElem.removeContent();
396            tagElem.addContent(updated);
397        }
398        else {
399            XLog.getLog(CoordActionInputCheckXCommand.class).warn(" Value NOT FOUND " + tagName);
400        }
401    }
402
403    /**
404     * Check if any unsolved paths under data output. Resolve the unresolved data input paths.
405     *
406     * @param eAction action element
407     * @param actionConf action configuration
408     * @return true if successful to resolve input and output paths
409     * @throws Exception thrown if failed to resolve data input and output paths
410     */
411    @SuppressWarnings("unchecked")
412    private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction,
413            Configuration actionConf) throws Exception {
414
415        boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction);
416
417        // Using latest() or future() in output-event is not intuitive.
418        // We need to make sure, this assumption is correct.
419        Element outputList = eAction.getChild("output-events", eAction.getNamespace());
420        if (outputList != null) {
421            for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
422                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) {
423                    throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
424                            " not permitted in output-event ");
425                }
426            }
427        }
428        return ret;
429    }
430
431    /**
432     * getting the error code of the coord action. (used mainly for unit testing)
433     */
434    protected String getCoordActionErrorCode() {
435        if (coordAction != null) {
436            return coordAction.getErrorCode();
437        }
438        return null;
439    }
440
441    /**
442     * getting the error message of the coord action. (used mainly for unit testing)
443     */
444    protected String getCoordActionErrorMsg() {
445        if (coordAction != null) {
446            return coordAction.getErrorMessage();
447        }
448        return null;
449    }
450
451    @Override
452    public String getEntityKey() {
453        return this.jobId;
454    }
455
456    @Override
457    protected boolean isLockRequired() {
458        return true;
459    }
460
461    @Override
462    protected void loadState() throws CommandException {
463        if (jpaService == null) {
464            jpaService = Services.get().get(JPAService.class);
465        }
466        try {
467            coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
468            coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_INPUT_CHECK,
469                    coordAction.getJobId());
470        }
471        catch (JPAExecutorException je) {
472            throw new CommandException(je);
473        }
474        LogUtils.setLogInfo(coordAction);
475    }
476
477    @Override
478    protected void verifyPrecondition() throws CommandException, PreconditionException {
479        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
480            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
481                    + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
482                    + coordAction.getStatus());
483        }
484
485        // if eligible to do action input check when running with backward support is true
486        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
487            return;
488        }
489
490        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR && coordJob.getStatus() != Job.Status.PAUSED
491                && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
492            throw new PreconditionException(
493                    ErrorCode.E1100, "["+ actionId + "]::CoordActionInputCheck:: Ignoring action." +
494                                " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
495                            + coordJob.getStatus());
496        }
497    }
498
499    @Override
500    public String getKey(){
501        return getName() + "_" + actionId;
502    }
503
504}